You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/05/02 15:51:22 UTC
svn commit: r1098566 [6/22] - in /lucene/dev/branches/docvalues: ./
dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/ant/
dev-tools/idea/lucene/contrib/db/bdb-je/
dev-tools/idea/lucene/contrib/db/bdb/ dev-tools/idea/lucene/contrib...
Modified: lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java?rev=1098566&r1=1098565&r2=1098566&view=diff
==============================================================================
--- lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (original)
+++ lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Mon May 2 13:50:57 2011
@@ -18,9 +18,17 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.codecs.FieldsConsumer;
+import org.apache.lucene.index.codecs.PostingsConsumer;
+import org.apache.lucene.index.codecs.TermStats;
+import org.apache.lucene.index.codecs.TermsConsumer;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
// TODO: break into separate freq and prox writers as
@@ -28,17 +36,17 @@ import org.apache.lucene.util.RamUsageEs
// be configured as any number of files 1..N
final class FreqProxTermsWriterPerField extends TermsHashConsumerPerField implements Comparable<FreqProxTermsWriterPerField> {
- final FreqProxTermsWriterPerThread perThread;
+ final FreqProxTermsWriter parent;
final TermsHashPerField termsHashPerField;
final FieldInfo fieldInfo;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final FieldInvertState fieldState;
boolean omitTermFreqAndPositions;
PayloadAttribute payloadAttribute;
- public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriterPerThread perThread, FieldInfo fieldInfo) {
+ public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriter parent, FieldInfo fieldInfo) {
this.termsHashPerField = termsHashPerField;
- this.perThread = perThread;
+ this.parent = parent;
this.fieldInfo = fieldInfo;
docState = termsHashPerField.docState;
fieldState = termsHashPerField.fieldState;
@@ -78,8 +86,8 @@ final class FreqProxTermsWriterPerField
if (fields[i].isIndexed())
return true;
return false;
- }
-
+ }
+
@Override
void start(Fieldable f) {
if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {
@@ -96,18 +104,18 @@ final class FreqProxTermsWriterPerField
} else {
payload = payloadAttribute.getPayload();
}
-
+
if (payload != null && payload.length > 0) {
termsHashPerField.writeVInt(1, (proxCode<<1)|1);
termsHashPerField.writeVInt(1, payload.length);
termsHashPerField.writeBytes(1, payload.data, payload.offset, payload.length);
- hasPayloads = true;
+ hasPayloads = true;
} else
termsHashPerField.writeVInt(1, proxCode<<1);
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
postings.lastPositions[termID] = fieldState.position;
-
+
}
@Override
@@ -115,7 +123,7 @@ final class FreqProxTermsWriterPerField
// First time we're seeing this term since the last
// flush
assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
postings.lastDocIDs[termID] = docState.docID;
if (omitTermFreqAndPositions) {
@@ -132,9 +140,9 @@ final class FreqProxTermsWriterPerField
void addTerm(final int termID) {
assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
-
+
assert omitTermFreqAndPositions || postings.docFreqs[termID] > 0;
if (omitTermFreqAndPositions) {
@@ -169,7 +177,7 @@ final class FreqProxTermsWriterPerField
}
}
}
-
+
@Override
ParallelPostingsArray createPostingsArray(int size) {
return new FreqProxPostingsArray(size);
@@ -212,7 +220,180 @@ final class FreqProxTermsWriterPerField
return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
}
}
-
+
public void abort() {}
+
+ BytesRef payload;
+
+ /* Walk through all unique text tokens (Posting
+ * instances) found in this field and serialize them
+ * into a single RAM segment. */
+ void flush(String fieldName, FieldsConsumer consumer, final SegmentWriteState state)
+ throws CorruptIndexException, IOException {
+
+ final TermsConsumer termsConsumer = consumer.addField(fieldInfo);
+ final Comparator<BytesRef> termComp = termsConsumer.getComparator();
+
+ final Term protoTerm = new Term(fieldName);
+
+ final boolean currentFieldOmitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions;
+
+ final Map<Term,Integer> segDeletes;
+ if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
+ segDeletes = state.segDeletes.terms;
+ } else {
+ segDeletes = null;
+ }
+
+ final int[] termIDs = termsHashPerField.sortPostings(termComp);
+ final int numTerms = termsHashPerField.bytesHash.size();
+ final BytesRef text = new BytesRef();
+ final FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
+ final ByteSliceReader freq = new ByteSliceReader();
+ final ByteSliceReader prox = new ByteSliceReader();
+
+ long sumTotalTermFreq = 0;
+ for (int i = 0; i < numTerms; i++) {
+ final int termID = termIDs[i];
+ // Get BytesRef
+ final int textStart = postings.textStarts[termID];
+ termsHashPerField.bytePool.setBytesRef(text, textStart);
+
+ termsHashPerField.initReader(freq, termID, 0);
+ if (!fieldInfo.omitTermFreqAndPositions) {
+ termsHashPerField.initReader(prox, termID, 1);
+ }
+
+ // TODO: really TermsHashPerField should take over most
+ // of this loop, including merge sort of terms from
+ // multiple threads and interacting with the
+ // TermsConsumer, only calling out to us (passing us the
+ // DocsConsumer) to handle delivery of docs/positions
+
+ final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
+
+ final int delDocLimit;
+ if (segDeletes != null) {
+ final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
+ if (docIDUpto != null) {
+ delDocLimit = docIDUpto;
+ } else {
+ delDocLimit = 0;
+ }
+ } else {
+ delDocLimit = 0;
+ }
+
+ // Now termStates has numToMerge FieldMergeStates
+ // which all share the same term. Now we must
+ // interleave the docID streams.
+ int numDocs = 0;
+ long totTF = 0;
+ int docID = 0;
+ int termFreq = 0;
+
+ while(true) {
+ if (freq.eof()) {
+ if (postings.lastDocCodes[termID] != -1) {
+ // Return last doc
+ docID = postings.lastDocIDs[termID];
+ if (!omitTermFreqAndPositions) {
+ termFreq = postings.docFreqs[termID];
+ }
+ postings.lastDocCodes[termID] = -1;
+ } else {
+ // EOF
+ break;
+ }
+ } else {
+ final int code = freq.readVInt();
+ if (omitTermFreqAndPositions) {
+ docID += code;
+ } else {
+ docID += code >>> 1;
+ if ((code & 1) != 0) {
+ termFreq = 1;
+ } else {
+ termFreq = freq.readVInt();
+ }
+ }
+
+ assert docID != postings.lastDocIDs[termID];
+ }
+
+ numDocs++;
+ assert docID < state.numDocs: "doc=" + docID + " maxDoc=" + state.numDocs;
+ final int termDocFreq = termFreq;
+
+ // NOTE: we could check here if the docID was
+ // deleted, and skip it. However, this is somewhat
+ // dangerous because it can yield non-deterministic
+ // behavior since we may see the docID before we see
+ // the term that caused it to be deleted. This
+ // would mean some (but not all) of its postings may
+ // make it into the index, which'd alter the docFreq
+ // for those terms. We could fix this by doing two
+ // passes, ie first sweep marks all del docs, and
+ // 2nd sweep does the real flush, but I suspect
+ // that'd add too much time to flush.
+ postingsConsumer.startDoc(docID, termDocFreq);
+ if (docID < delDocLimit) {
+ // Mark it deleted. TODO: we could also skip
+ // writing its postings; this would be
+ // deterministic (just for this Term's docs).
+ if (state.deletedDocs == null) {
+ state.deletedDocs = new BitVector(state.numDocs);
+ }
+ state.deletedDocs.set(docID);
+ }
+
+ // Carefully copy over the prox + payload info,
+ // changing the format to match Lucene's segment
+ // format.
+ if (!currentFieldOmitTermFreqAndPositions) {
+ // omitTermFreqAndPositions == false so we do write positions &
+ // payload
+ int position = 0;
+ totTF += termDocFreq;
+ for(int j=0;j<termDocFreq;j++) {
+ final int code = prox.readVInt();
+ position += code >> 1;
+
+ final int payloadLength;
+ final BytesRef thisPayload;
+
+ if ((code & 1) != 0) {
+ // This position has a payload
+ payloadLength = prox.readVInt();
+
+ if (payload == null) {
+ payload = new BytesRef();
+ payload.bytes = new byte[payloadLength];
+ } else if (payload.bytes.length < payloadLength) {
+ payload.grow(payloadLength);
+ }
+
+ prox.readBytes(payload.bytes, 0, payloadLength);
+ payload.length = payloadLength;
+ thisPayload = payload;
+
+ } else {
+ payloadLength = 0;
+ thisPayload = null;
+ }
+
+ postingsConsumer.addPosition(position, thisPayload);
+ }
+
+ postingsConsumer.finishDoc();
+ }
+ }
+ termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
+ sumTotalTermFreq += totTF;
+ }
+
+ termsConsumer.finish(sumTotalTermFreq);
+ }
+
}
Modified: lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1098566&r1=1098565&r2=1098566&view=diff
==============================================================================
--- lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Mon May 2 13:50:57 2011
@@ -21,7 +21,13 @@ import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.store.Directory;
@@ -49,12 +55,12 @@ import org.apache.lucene.util.Collection
* (IndexDeletionPolicy) is consulted on creation (onInit)
* and once per commit (onCommit), to decide when a commit
* should be removed.
- *
+ *
* It is the business of the IndexDeletionPolicy to choose
* when to delete commit points. The actual mechanics of
* file deletion, retrying, etc, derived from the deletion
* of commit points is the business of the IndexFileDeleter.
- *
+ *
* The current default deletion policy is {@link
* KeepOnlyLastCommitDeletionPolicy}, which removes all
* prior commits when a new commit has completed. This
@@ -72,7 +78,7 @@ final class IndexFileDeleter {
* so we will retry them again later: */
private List<String> deletable;
- /* Reference count for all files in the index.
+ /* Reference count for all files in the index.
* Counts how many existing commits reference a file.
**/
private Map<String, RefCount> refCounts = new HashMap<String, RefCount>();
@@ -88,7 +94,7 @@ final class IndexFileDeleter {
* non-commit checkpoint: */
private List<Collection<String>> lastFiles = new ArrayList<Collection<String>>();
- /* Commits that the IndexDeletionPolicy have decided to delete: */
+ /* Commits that the IndexDeletionPolicy have decided to delete: */
private List<CommitPoint> commitsToDelete = new ArrayList<CommitPoint>();
private PrintStream infoStream;
@@ -108,7 +114,7 @@ final class IndexFileDeleter {
message("setInfoStream deletionPolicy=" + policy);
}
}
-
+
private void message(String message) {
infoStream.println("IFD [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
}
@@ -139,12 +145,12 @@ final class IndexFileDeleter {
// counts:
long currentGen = segmentInfos.getGeneration();
indexFilenameFilter = new IndexFileNameFilter(codecs);
-
+
CommitPoint currentCommitPoint = null;
String[] files = null;
try {
files = directory.listAll();
- } catch (NoSuchDirectoryException e) {
+ } catch (NoSuchDirectoryException e) {
// it means the directory is empty, so ignore it.
files = new String[0];
}
@@ -152,7 +158,7 @@ final class IndexFileDeleter {
for (String fileName : files) {
if ((indexFilenameFilter.accept(null, fileName)) && !fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
-
+
// Add this file to refCounts with initial count 0:
getRefCount(fileName);
@@ -233,7 +239,7 @@ final class IndexFileDeleter {
// Now delete anything with ref count at 0. These are
// presumably abandoned files eg due to crash of
// IndexWriter.
- for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
+ for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
RefCount rc = entry.getValue();
final String fileName = entry.getKey();
if (0 == rc.count) {
@@ -253,7 +259,7 @@ final class IndexFileDeleter {
// Always protect the incoming segmentInfos since
// sometime it may not be the most recent commit
checkpoint(segmentInfos, false);
-
+
startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
deleteCommits();
@@ -327,7 +333,7 @@ final class IndexFileDeleter {
segmentPrefix1 = null;
segmentPrefix2 = null;
}
-
+
for(int i=0;i<files.length;i++) {
String fileName = files[i];
if ((segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
@@ -379,7 +385,7 @@ final class IndexFileDeleter {
deleteCommits();
}
}
-
+
public void deletePendingFiles() throws IOException {
if (deletable != null) {
List<String> oldDeletable = deletable;
@@ -397,7 +403,7 @@ final class IndexFileDeleter {
/**
* For definition of "check point" see IndexWriter comments:
* "Clarification: Check Points (and commits)".
- *
+ *
* Writer calls this when it has made a "consistent
* change" to the index, meaning new files are written to
* the index and the in-memory SegmentInfos have been
@@ -417,7 +423,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
Modified: lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexReader.java?rev=1098566&r1=1098565&r2=1098566&view=diff
==============================================================================
--- lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexReader.java (original)
+++ lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexReader.java Mon May 2 13:50:57 2011
@@ -23,6 +23,7 @@ import org.apache.lucene.search.FieldCac
import org.apache.lucene.search.Similarity;
import org.apache.lucene.index.codecs.Codec;
import org.apache.lucene.index.codecs.CodecProvider;
+import org.apache.lucene.index.codecs.PerDocValues;
import org.apache.lucene.index.values.DocValues;
import org.apache.lucene.store.*;
import org.apache.lucene.util.ArrayUtil;
@@ -923,6 +924,22 @@ public abstract class IndexReader implem
}
}
+ /**
+ * Returns <code>true</code> if an index exists at the specified directory.
+ * @param directory the directory to check for an index
+ * @param codecProvider provides a CodecProvider in case the index uses non-core codecs
+ * @return <code>true</code> if an index exists; <code>false</code> otherwise
+ * @throws IOException if there is a problem with accessing the index
+ */
+ public static boolean indexExists(Directory directory, CodecProvider codecProvider) throws IOException {
+ try {
+ new SegmentInfos().read(directory, codecProvider);
+ return true;
+ } catch (IOException ioe) {
+ return false;
+ }
+ }
+
/** Returns the number of documents in this index. */
public abstract int numDocs();
@@ -1051,6 +1068,9 @@ public abstract class IndexReader implem
* using {@link ReaderUtil#gatherSubReaders} and iterate
* through them yourself. */
public abstract Fields fields() throws IOException;
+
+ // nocommit javadoc
+ public abstract PerDocValues perDocValues() throws IOException;
public int docFreq(Term term) throws IOException {
return docFreq(term.field(), term.bytes());
@@ -1554,11 +1574,11 @@ public abstract class IndexReader implem
}
public DocValues docValues(String field) throws IOException {
- final Fields fields = fields();
- if (fields == null) {
+ final PerDocValues perDoc = perDocValues();
+ if (perDoc == null) {
return null;
}
- return fields.docValues(field);
+ return perDoc.docValues(field);
}
private volatile Fields fields;
@@ -1572,6 +1592,19 @@ public abstract class IndexReader implem
Fields retrieveFields() {
return fields;
}
+
+ private volatile PerDocValues perDocValues;
+
+ /** @lucene.internal */
+ void storePerDoc(PerDocValues perDocValues) {
+ this.perDocValues = perDocValues;
+ }
+
+ /** @lucene.internal */
+ PerDocValues retrievePerDoc() {
+ return perDocValues;
+ }
+
/**
* A struct like class that represents a hierarchical relationship between
Modified: lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1098566&r1=1098565&r2=1098566&view=diff
==============================================================================
--- lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/docvalues/lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon May 2 13:50:57 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
@@ -46,6 +47,7 @@ import org.apache.lucene.store.BufferedI
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -54,17 +56,16 @@ import org.apache.lucene.util.MapBackedS
/**
An <code>IndexWriter</code> creates and maintains an index.
- <p>The <code>create</code> argument to the {@link
- #IndexWriter(Directory, IndexWriterConfig) constructor} determines
+ <p>The {@link OpenMode} option on
+ {@link IndexWriterConfig#setOpenMode(OpenMode)} determines
whether a new index is created, or whether an existing index is
- opened. Note that you can open an index with <code>create=true</code>
- even while readers are using the index. The old readers will
+ opened. Note that you can open an index with {@link OpenMode#CREATE}
+ even while readers are using the index. The old readers will
continue to search the "point in time" snapshot they had opened,
- and won't see the newly created index until they re-open. There are
- also {@link #IndexWriter(Directory, IndexWriterConfig) constructors}
- with no <code>create</code> argument which will create a new index
- if there is not already an index at the provided path and otherwise
- open the existing index.</p>
+ and won't see the newly created index until they re-open. If
+ {@link OpenMode#CREATE_OR_APPEND} is used IndexWriter will create a
+ new index if there is not already an index at the provided path
+ and otherwise open the existing index.</p>
<p>In either case, documents are added with {@link #addDocument(Document)
addDocument} and removed with {@link #deleteDocuments(Term)} or {@link
@@ -76,15 +77,19 @@ import org.apache.lucene.util.MapBackedS
<a name="flush"></a>
<p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method
- calls). A flush is triggered when there are enough
- buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
- or enough added documents since the last flush, whichever
- is sooner. For the added documents, flushing is triggered
- either by RAM usage of the documents (see {@link
- IndexWriterConfig#setRAMBufferSizeMB}) or the number of added documents.
- The default is to flush when RAM usage hits 16 MB. For
+ calls). A flush is triggered when there are enough added documents
+ since the last flush. Flushing is triggered either by RAM usage of the
+ documents (see {@link IndexWriterConfig#setRAMBufferSizeMB}) or the
+ number of added documents (see {@link IndexWriterConfig#setMaxBufferedDocs(int)}).
+ The default is to flush when RAM usage hits
+ {@link IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
best indexing speed you should flush by RAM usage with a
- large RAM buffer. Note that flushing just moves the
+ large RAM buffer. Additionally, if IndexWriter reaches the configured number of
+ buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
+ the deleted terms and queries are flushed and applied to existing segments.
+ In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and
+ {@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
+ won't trigger a segment flush. Note that flushing just moves the
internal buffered state in IndexWriter into the index, but
these changes are not visible to IndexReader until either
{@link #commit()} or {@link #close} is called. A flush may
@@ -165,21 +170,21 @@ import org.apache.lucene.util.MapBackedS
/*
* Clarification: Check Points (and commits)
* IndexWriter writes new index files to the directory without writing a new segments_N
- * file which references these new files. It also means that the state of
+ * file which references these new files. It also means that the state of
* the in memory SegmentInfos object is different than the most recent
* segments_N file written to the directory.
- *
- * Each time the SegmentInfos is changed, and matches the (possibly
- * modified) directory files, we have a new "check point".
- * If the modified/new SegmentInfos is written to disk - as a new
- * (generation of) segments_N file - this check point is also an
+ *
+ * Each time the SegmentInfos is changed, and matches the (possibly
+ * modified) directory files, we have a new "check point".
+ * If the modified/new SegmentInfos is written to disk - as a new
+ * (generation of) segments_N file - this check point is also an
* IndexCommit.
- *
- * A new checkpoint always replaces the previous checkpoint and
- * becomes the new "front" of the index. This allows the IndexFileDeleter
+ *
+ * A new checkpoint always replaces the previous checkpoint and
+ * becomes the new "front" of the index. This allows the IndexFileDeleter
* to delete files that are referenced only by stale checkpoints.
* (files that were created since the last commit, but are no longer
- * referenced by the "front" of the index). For this, IndexFileDeleter
+ * referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable {
@@ -195,7 +200,7 @@ public class IndexWriter implements Clos
* printed to infoStream, if set (see {@link
* #setInfoStream}).
*/
- public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8;
+ public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
// The normal read buffer size defaults to 1024, but
// increasing this during merging seems to yield
@@ -225,7 +230,7 @@ public class IndexWriter implements Clos
final FieldNumberBiMap globalFieldNumberMap;
private DocumentsWriter docWriter;
- private IndexFileDeleter deleter;
+ final IndexFileDeleter deleter;
private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>(); // used by optimize to note those needing optimization
private int optimizeMaxNumSegments;
@@ -247,12 +252,12 @@ public class IndexWriter implements Clos
private long mergeGen;
private boolean stopMerges;
- private final AtomicInteger flushCount = new AtomicInteger();
- private final AtomicInteger flushDeletesCount = new AtomicInteger();
+ final AtomicInteger flushCount = new AtomicInteger();
+ final AtomicInteger flushDeletesCount = new AtomicInteger();
final ReaderPool readerPool = new ReaderPool();
final BufferedDeletesStream bufferedDeletesStream;
-
+
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
// then cooled to permanently record the event): it's
@@ -339,31 +344,58 @@ public class IndexWriter implements Clos
*/
IndexReader getReader(boolean applyAllDeletes) throws IOException {
ensureOpen();
-
+
final long tStart = System.currentTimeMillis();
if (infoStream != null) {
message("flush at getReader");
}
-
// Do this up front before flushing so that the readers
// obtained during this flush are pooled, the first time
// this method is called:
poolReaders = true;
-
- // Prevent segmentInfos from changing while opening the
- // reader; in theory we could do similar retry logic,
- // just like we do when loading segments_N
- IndexReader r;
- synchronized(this) {
- flush(false, applyAllDeletes);
- r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
- if (infoStream != null) {
- message("return reader version=" + r.getVersion() + " reader=" + r);
+ final IndexReader r;
+ doBeforeFlush();
+ final boolean anySegmentFlushed;
+ /*
+ * for releasing a NRT reader we must ensure that
+ * DW doesn't add any segments or deletes until we are
+ * done with creating the NRT DirectoryReader.
+ * We release the two stage full flush after we are done opening the
+ * directory reader!
+ */
+ synchronized (fullFlushLock) {
+ boolean success = false;
+ try {
+ anySegmentFlushed = docWriter.flushAllThreads();
+ if (!anySegmentFlushed) {
+ // prevent double increment since docWriter#doFlush increments the flushcount
+ // if we flushed anything.
+ flushCount.incrementAndGet();
+ }
+ success = true;
+ // Prevent segmentInfos from changing while opening the
+ // reader; in theory we could do similar retry logic,
+ // just like we do when loading segments_N
+ synchronized(this) {
+ maybeApplyDeletes(applyAllDeletes);
+ r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+ if (infoStream != null) {
+ message("return reader version=" + r.getVersion() + " reader=" + r);
+ }
+ }
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception during while NRT reader");
+ }
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(success);
+ doAfterFlush();
}
}
- maybeMerge();
-
+ if (anySegmentFlushed) {
+ maybeMerge();
+ }
if (infoStream != null) {
message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
}
@@ -400,10 +432,10 @@ public class IndexWriter implements Clos
if (r != null) {
r.hasChanges = false;
}
- }
+ }
}
}
-
+
// used only by asserts
public synchronized boolean infoIsLive(SegmentInfo info) {
int idx = segmentInfos.indexOf(info);
@@ -419,7 +451,7 @@ public class IndexWriter implements Clos
}
return info;
}
-
+
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
@@ -432,7 +464,7 @@ public class IndexWriter implements Clos
public synchronized boolean release(SegmentReader sr) throws IOException {
return release(sr, false);
}
-
+
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
@@ -493,7 +525,7 @@ public class IndexWriter implements Clos
sr.close();
}
}
-
+
/** Remove all our references to readers, and commits
* any pending changes. */
synchronized void close() throws IOException {
@@ -503,7 +535,7 @@ public class IndexWriter implements Clos
Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
-
+
Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
SegmentReader sr = ent.getValue();
@@ -526,7 +558,7 @@ public class IndexWriter implements Clos
sr.decRef();
}
}
-
+
/**
* Commit all segment reader in the pool.
* @throws IOException
@@ -550,7 +582,7 @@ public class IndexWriter implements Clos
}
}
}
-
+
/**
* Returns a ref to a clone. NOTE: this clone is not
* enrolled in the pool, so you should simply close()
@@ -564,7 +596,7 @@ public class IndexWriter implements Clos
sr.decRef();
}
}
-
+
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
@@ -580,7 +612,7 @@ public class IndexWriter implements Clos
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
- *
+ *
* @see #release(SegmentReader)
* @param info
* @param doOpenStores
@@ -638,7 +670,7 @@ public class IndexWriter implements Clos
return sr;
}
}
-
+
/**
* Obtain the number of deleted docs for a pooled reader.
* If the reader isn't being pooled, the segmentInfo's
@@ -658,7 +690,7 @@ public class IndexWriter implements Clos
}
}
}
-
+
/**
* Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been
@@ -721,7 +753,7 @@ public class IndexWriter implements Clos
mergePolicy.setIndexWriter(this);
mergeScheduler = conf.getMergeScheduler();
codecs = conf.getCodecProvider();
-
+
bufferedDeletesStream = new BufferedDeletesStream(messageID);
bufferedDeletesStream.setInfoStream(infoStream);
poolReaders = conf.getReaderPooling();
@@ -790,8 +822,7 @@ public class IndexWriter implements Clos
// start with previous field numbers, but new FieldInfos
globalFieldNumberMap = segmentInfos.getOrLoadGlobalFieldNumberMap(directory);
- docWriter = new DocumentsWriter(config, directory, this, conf.getIndexingChain(),
- globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)), bufferedDeletesStream);
+ docWriter = new DocumentsWriter(config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
docWriter.setInfoStream(infoStream);
// Default deleter (for backwards compatibility) is
@@ -849,7 +880,7 @@ public class IndexWriter implements Clos
public IndexWriterConfig getConfig() {
return config;
}
-
+
/** If non-null, this will be the default infoStream used
* by a newly instantiated IndexWriter.
* @see #setInfoStream
@@ -871,7 +902,7 @@ public class IndexWriter implements Clos
* message when maxFieldLength is reached will be printed
* to this.
*/
- public void setInfoStream(PrintStream infoStream) {
+ public void setInfoStream(PrintStream infoStream) throws IOException {
ensureOpen();
this.infoStream = infoStream;
docWriter.setInfoStream(infoStream);
@@ -881,7 +912,7 @@ public class IndexWriter implements Clos
messageState();
}
- private void messageState() {
+ private void messageState() throws IOException {
message("\ndir=" + directory + "\n" +
"index=" + segString() + "\n" +
"version=" + Constants.LUCENE_VERSION + "\n" +
@@ -901,7 +932,7 @@ public class IndexWriter implements Clos
public boolean verbose() {
return infoStream != null;
}
-
+
/**
* Commits all changes to an index and closes all
* associated files. Note that this may be a costly
@@ -916,7 +947,7 @@ public class IndexWriter implements Clos
* even though part of it (flushing buffered documents)
* may have succeeded, so the write lock will still be
* held.</p>
- *
+ *
* <p> If you can correct the underlying cause (eg free up
* some disk space) then you can call close() again.
* Failing that, if you want to force the write lock to be
@@ -1036,7 +1067,7 @@ public class IndexWriter implements Clos
if (infoStream != null)
message("now call final commit()");
-
+
if (!hitOOM) {
commitInternal(null);
}
@@ -1049,7 +1080,7 @@ public class IndexWriter implements Clos
docWriter = null;
deleter.close();
}
-
+
if (writeLock != null) {
writeLock.release(); // release write lock
writeLock = null;
@@ -1072,7 +1103,7 @@ public class IndexWriter implements Clos
}
/** Returns the Directory used by this index. */
- public Directory getDirectory() {
+ public Directory getDirectory() {
// Pass false because the flush during closing calls getDirectory
ensureOpen(false);
return directory;
@@ -1196,22 +1227,7 @@ public class IndexWriter implements Clos
* @throws IOException if there is a low-level IO error
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
- ensureOpen();
- boolean doFlush = false;
- boolean success = false;
- try {
- try {
- doFlush = docWriter.updateDocument(doc, analyzer, null);
- success = true;
- } finally {
- if (!success && infoStream != null)
- message("hit exception adding document");
- }
- if (doFlush)
- flush(true, false);
- } catch (OutOfMemoryError oom) {
- handleOOM(oom, "addDocument");
- }
+ updateDocument(null, doc, analyzer);
}
/**
@@ -1228,9 +1244,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteTerm(term, false)) {
- flush(true, false);
- }
+ docWriter.deleteTerms(term);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@@ -1238,7 +1252,8 @@ public class IndexWriter implements Clos
/**
* Deletes the document(s) containing any of the
- * terms. All deletes are flushed at the same time.
+ * terms. All given deletes are applied and flushed atomically
+ * at the same time.
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
@@ -1252,9 +1267,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteTerms(terms)) {
- flush(true, false);
- }
+ docWriter.deleteTerms(terms);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@@ -1274,9 +1287,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteQuery(query)) {
- flush(true, false);
- }
+ docWriter.deleteQueries(query);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query)");
}
@@ -1284,7 +1295,7 @@ public class IndexWriter implements Clos
/**
* Deletes the document(s) matching any of the provided queries.
- * All deletes are flushed at the same time.
+ * All given deletes are applied and flushed atomically at the same time.
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
@@ -1298,9 +1309,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteQueries(queries)) {
- flush(true, false);
- }
+ docWriter.deleteQueries(queries);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query..)");
}
@@ -1350,17 +1359,18 @@ public class IndexWriter implements Clos
throws CorruptIndexException, IOException {
ensureOpen();
try {
- boolean doFlush = false;
boolean success = false;
+ boolean anySegmentFlushed = false;
try {
- doFlush = docWriter.updateDocument(doc, analyzer, term);
+ anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
success = true;
} finally {
if (!success && infoStream != null)
message("hit exception updating document");
}
- if (doFlush) {
- flush(true, false);
+
+ if (anySegmentFlushed) {
+ maybeMerge();
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
@@ -1546,7 +1556,7 @@ public class IndexWriter implements Clos
resetMergeExceptions();
segmentsToOptimize = new HashSet<SegmentInfo>(segmentInfos);
optimizeMaxNumSegments = maxNumSegments;
-
+
// Now mark all pending & running merges as optimize
// merge:
for(final MergePolicy.OneMerge merge : pendingMerges) {
@@ -1612,12 +1622,12 @@ public class IndexWriter implements Clos
if (merge.optimize)
return true;
}
-
+
for (final MergePolicy.OneMerge merge : runningMerges) {
if (merge.optimize)
return true;
}
-
+
return false;
}
@@ -1640,6 +1650,8 @@ public class IndexWriter implements Clos
throws CorruptIndexException, IOException {
ensureOpen();
+ flush(true, true);
+
if (infoStream != null)
message("expungeDeletes: index now " + segString());
@@ -1712,6 +1724,10 @@ public class IndexWriter implements Clos
* documents, so you must do so yourself if necessary.
* See also {@link #expungeDeletes(boolean)}
*
+ * <p><b>NOTE</b>: this method first flushes a new
+ * segment (if there are indexed documents), and applies
+ * all buffered deletes.
+ *
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
* href="#OOME">above</a> for details.</p>
@@ -1908,7 +1924,7 @@ public class IndexWriter implements Clos
/**
* Delete all documents in the index.
*
- * <p>This method will drop all buffered documents and will
+ * <p>This method will drop all buffered documents and will
* remove all segments from the index. This change will not be
* visible until a {@link #commit()} has been called. This method
* can be rolled back using {@link #rollback()}.</p>
@@ -1938,7 +1954,7 @@ public class IndexWriter implements Clos
deleter.refresh();
// Don't bother saving any changes in our segmentInfos
- readerPool.clear(null);
+ readerPool.clear(null);
// Mark that the index has changed
++changeCount;
@@ -1965,7 +1981,7 @@ public class IndexWriter implements Clos
mergeFinish(merge);
}
pendingMerges.clear();
-
+
for (final MergePolicy.OneMerge merge : runningMerges) {
if (infoStream != null)
message("now abort running merge " + merge.segString(directory));
@@ -1992,7 +2008,7 @@ public class IndexWriter implements Clos
message("all running merges have aborted");
} else {
- // waitForMerges() will ensure any running addIndexes finishes.
+ // waitForMerges() will ensure any running addIndexes finishes.
// It's fine if a new one attempts to start because from our
// caller above the call will see that we are in the
// process of closing, and will throw an
@@ -2004,7 +2020,7 @@ public class IndexWriter implements Clos
/**
* Wait for any currently outstanding merges to finish.
*
- * <p>It is guaranteed that any merges started prior to calling this method
+ * <p>It is guaranteed that any merges started prior to calling this method
* will have completed once this method completes.</p>
*/
public synchronized void waitForMerges() {
@@ -2034,6 +2050,125 @@ public class IndexWriter implements Clos
deleter.checkpoint(segmentInfos, false);
}
+ /**
+ * Prepares the {@link SegmentInfo} for the new flushed segment and persists
+ * the deleted documents {@link BitVector}. Use
+ * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
+ * publish the returned {@link SegmentInfo} together with its segment private
+ * delete packet.
+ *
+ * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
+ */
+ SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+ assert flushedSegment != null;
+
+ SegmentInfo newSegment = flushedSegment.segmentInfo;
+
+ setDiagnostics(newSegment, "flush");
+
+ boolean success = false;
+ try {
+ if (useCompoundFile(newSegment)) {
+ String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+ message("creating compound file " + compoundFileName);
+ // Now build compound file
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+ for(String fileName : newSegment.files()) {
+ cfsWriter.addFile(fileName);
+ }
+
+ // Perform the merge
+ cfsWriter.close();
+ synchronized(this) {
+ deleter.deleteNewFiles(newSegment.files());
+ }
+
+ newSegment.setUseCompoundFile(true);
+ }
+
+ // Must write deleted docs after the CFS so we don't
+ // slurp the del file into CFS:
+ if (flushedSegment.deletedDocuments != null) {
+ final int delCount = flushedSegment.deletedDocuments.count();
+ assert delCount > 0;
+ newSegment.setDelCount(delCount);
+ newSegment.advanceDelGen();
+ final String delFileName = newSegment.getDelFileName();
+ if (infoStream != null) {
+ message("flush: write " + delCount + " deletes to " + delFileName);
+ }
+ boolean success2 = false;
+ try {
+ // TODO: in the NRT case it'd be better to hand
+ // this del vector over to the
+ // shortly-to-be-opened SegmentReader and let it
+ // carry the changes; there's no reason to use
+ // filesystem as intermediary here.
+ flushedSegment.deletedDocuments.write(directory, delFileName);
+ success2 = true;
+ } finally {
+ if (!success2) {
+ try {
+ directory.deleteFile(delFileName);
+ } catch (Throwable t) {
+ // suppress this so we keep throwing the
+ // original exception
+ }
+ }
+ }
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ if (infoStream != null) {
+ message("hit exception " +
+ "reating compound file for newly flushed segment " + newSegment.name);
+ }
+
+ synchronized(this) {
+ deleter.refresh(newSegment.name);
+ }
+ }
+ }
+ return newSegment;
+ }
+
+ /**
+ * Atomically adds the segment private delete packet and publishes the flushed
+ * segments SegmentInfo to the index writer. NOTE: use
+ * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
+ * {@link SegmentInfo} for the flushed segment.
+ *
+ * @see #prepareFlushedSegment(FlushedSegment)
+ */
+ synchronized void publishFlushedSegment(SegmentInfo newSegment,
+ FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
+ // Lock order IW -> BDS
+ synchronized (bufferedDeletesStream) {
+ if (globalPacket != null && globalPacket.any()) {
+ bufferedDeletesStream.push(globalPacket);
+ }
+ // Publishing the segment must be synched on IW -> BDS to make the sure
+ // that no merge prunes away the seg. private delete packet
+ final long nextGen;
+ if (packet != null && packet.any()) {
+ nextGen = bufferedDeletesStream.push(packet);
+ } else {
+ // Since we don't have a delete packet to apply we can get a new
+ // generation right away
+ nextGen = bufferedDeletesStream.getNextGen();
+ }
+ newSegment.setBufferedDeletesGen(nextGen);
+ segmentInfos.add(newSegment);
+ checkpoint();
+ }
+ }
+
+ synchronized boolean useCompoundFile(SegmentInfo segmentInfo) throws IOException {
+ return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+ }
+
private synchronized void resetMergeExceptions() {
mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
mergeGen++;
@@ -2082,11 +2217,11 @@ public class IndexWriter implements Clos
* <p>
* <b>NOTE:</b> this method only copies the segments of the incoming indexes
* and does not merge them. Therefore deleted documents are not removed and
- * the new segments are not merged with the existing ones. Also, the segments
- * are copied as-is, meaning they are not converted to CFS if they aren't,
- * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
+ * the new segments are not merged with the existing ones. Also, the segments
+ * are copied as-is, meaning they are not converted to CFS if they aren't,
+ * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
* or {@link #optimize} afterwards.
- *
+ *
* <p>This requires this index not be among those to be added.
*
* <p>
@@ -2123,7 +2258,7 @@ public class IndexWriter implements Clos
docCount += info.docCount;
String newSegName = newSegmentName();
String dsName = info.getDocStoreSegment();
-
+
if (infoStream != null) {
message("addIndexes: process segment origName=" + info.name + " newName=" + newSegName + " dsName=" + dsName + " info=" + info);
}
@@ -2170,7 +2305,7 @@ public class IndexWriter implements Clos
infos.add(info);
}
- }
+ }
synchronized (this) {
ensureOpen();
@@ -2211,15 +2346,20 @@ public class IndexWriter implements Clos
ensureOpen();
try {
+ if (infoStream != null)
+ message("flush at addIndexes(IndexReader...)");
+ flush(false, true);
+
String mergedName = newSegmentName();
SegmentMerger merger = new SegmentMerger(directory, config.getTermIndexInterval(),
mergedName, null, codecs, payloadProcessorProvider,
globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)));
-
+
for (IndexReader reader : readers) // add new indexes
merger.add(reader);
-
+
int docCount = merger.merge(); // merge 'em
+
final FieldInfos fieldInfos = merger.fieldInfos();
SegmentInfo info = new SegmentInfo(mergedName, docCount, directory,
false, fieldInfos.hasProx(), merger.getSegmentCodecs(),
@@ -2231,11 +2371,11 @@ public class IndexWriter implements Clos
synchronized(this) { // Guard segmentInfos
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, info);
}
-
+
// Now create the compound file if needed
if (useCompoundFile) {
merger.createCompoundFile(mergedName + ".cfs", info);
-
+
// delete new non cfs files directly: they were never
// registered with IFD
deleter.deleteNewFiles(info.files());
@@ -2287,7 +2427,7 @@ public class IndexWriter implements Clos
* #commit()} to finish the commit, or {@link
* #rollback()} to revert the commit and undo all changes
* done since the writer was opened.</p>
- *
+ *
* You can also just call {@link #commit(Map)} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
@@ -2431,6 +2571,10 @@ public class IndexWriter implements Clos
}
}
+ // Ensures only one flush() is actually flushing segments
+ // at a time:
+ private final Object fullFlushLock = new Object();
+
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
@@ -2454,116 +2598,104 @@ public class IndexWriter implements Clos
}
}
- // TODO: this method should not have to be entirely
- // synchronized, ie, merges should be allowed to commit
- // even while a flush is happening
- private synchronized boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
-
+ private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
}
doBeforeFlush();
-
assert testPoint("startDoFlush");
-
- // We may be flushing because it was triggered by doc
- // count, del count, ram usage (in which case flush
- // pending is already set), or we may be flushing
- // due to external event eg getReader or commit is
- // called (in which case we now set it, and this will
- // pause all threads):
- flushControl.setFlushPendingNoWait("explicit flush");
-
boolean success = false;
-
try {
if (infoStream != null) {
message(" start flush: applyAllDeletes=" + applyAllDeletes);
message(" index before flush " + segString());
}
-
- final SegmentInfo newSegment = docWriter.flush(this, deleter, mergePolicy, segmentInfos);
- if (newSegment != null) {
- setDiagnostics(newSegment, "flush");
- segmentInfos.add(newSegment);
- checkpoint();
- }
-
- if (!applyAllDeletes) {
- // If deletes alone are consuming > 1/2 our RAM
- // buffer, force them all to apply now. This is to
- // prevent too-frequent flushing of a long tail of
- // tiny segments:
- if (flushControl.getFlushDeletes() ||
- (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
- applyAllDeletes = true;
- if (infoStream != null) {
- message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
- }
+ final boolean anySegmentFlushed;
+
+ synchronized (fullFlushLock) {
+ try {
+ anySegmentFlushed = docWriter.flushAllThreads();
+ success = true;
+ } finally {
+ docWriter.finishFullFlush(success);
}
}
-
- if (applyAllDeletes) {
- if (infoStream != null) {
- message("apply all deletes during flush");
- }
- flushDeletesCount.incrementAndGet();
- final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
- if (result.anyDeletes) {
- checkpoint();
- }
- if (!keepFullyDeletedSegments && result.allDeleted != null) {
- if (infoStream != null) {
- message("drop 100% deleted segments: " + result.allDeleted);
- }
- for(SegmentInfo info : result.allDeleted) {
- // If a merge has already registered for this
- // segment, we leave it in the readerPool; the
- // merge will skip merging it and will then drop
- // it once it's done:
- if (!mergingSegments.contains(info)) {
- segmentInfos.remove(info);
- if (readerPool != null) {
- readerPool.drop(info);
- }
- }
- }
- checkpoint();
+ success = false;
+ synchronized(this) {
+ maybeApplyDeletes(applyAllDeletes);
+ doAfterFlush();
+ if (!anySegmentFlushed) {
+ // flushCount is incremented in flushAllThreads
+ flushCount.incrementAndGet();
}
- bufferedDeletesStream.prune(segmentInfos);
- assert !bufferedDeletesStream.any();
- flushControl.clearDeletes();
- } else if (infoStream != null) {
- message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+ success = true;
+ return anySegmentFlushed;
}
-
- doAfterFlush();
- flushCount.incrementAndGet();
-
- success = true;
-
- return newSegment != null;
-
} catch (OutOfMemoryError oom) {
handleOOM(oom, "doFlush");
// never hit
return false;
} finally {
- flushControl.clearFlushPending();
if (!success && infoStream != null)
message("hit exception during flush");
}
}
+
+ final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
+ if (applyAllDeletes) {
+ if (infoStream != null) {
+ message("apply all deletes during flush");
+ }
+ applyAllDeletes();
+ } else if (infoStream != null) {
+ message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+ }
+
+ }
+
+ final synchronized void applyAllDeletes() throws IOException {
+ flushDeletesCount.incrementAndGet();
+ final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
+ .applyDeletes(readerPool, segmentInfos);
+ if (result.anyDeletes) {
+ checkpoint();
+ }
+ if (!keepFullyDeletedSegments && result.allDeleted != null) {
+ if (infoStream != null) {
+ message("drop 100% deleted segments: " + result.allDeleted);
+ }
+ for (SegmentInfo info : result.allDeleted) {
+ // If a merge has already registered for this
+ // segment, we leave it in the readerPool; the
+ // merge will skip merging it and will then drop
+ // it once it's done:
+ if (!mergingSegments.contains(info)) {
+ segmentInfos.remove(info);
+ if (readerPool != null) {
+ readerPool.drop(info);
+ }
+ }
+ }
+ checkpoint();
+ }
+ bufferedDeletesStream.prune(segmentInfos);
+ }
/** Expert: Return the total size of all index files currently cached in memory.
* Useful for size management with flushRamDocs()
*/
public final long ramSizeInBytes() {
ensureOpen();
- return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
+ return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+ }
+
+ // for testing only
+ DocumentsWriter getDocsWriter() {
+ boolean test = false;
+ assert test = true;
+ return test?docWriter: null;
}
/** Expert: Return the number of documents currently
@@ -2573,7 +2705,7 @@ public class IndexWriter implements Clos
return docWriter.getNumDocs();
}
- private void ensureValidMerge(MergePolicy.OneMerge merge) {
+ private void ensureValidMerge(MergePolicy.OneMerge merge) throws IOException {
for(SegmentInfo info : merge.segments) {
if (segmentInfos.indexOf(info) == -1) {
throw new MergePolicy.MergeException("MergePolicy selected a segment (" + info.name + ") that is not in the current index " + segString(), directory);
@@ -2699,7 +2831,7 @@ public class IndexWriter implements Clos
}
commitMergedDeletes(merge, mergedReader);
-
+
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
// started), then we will switch to the compound
@@ -2713,7 +2845,7 @@ public class IndexWriter implements Clos
message("merged segment " + merge.info + " is 100% deleted" + (keepFullyDeletedSegments ? "" : "; skipping insert"));
}
- final Set mergedAway = new HashSet<SegmentInfo>(merge.segments);
+ final Set<SegmentInfo> mergedAway = new HashSet<SegmentInfo>(merge.segments);
int segIdx = 0;
int newSegIdx = 0;
boolean inserted = false;
@@ -2760,15 +2892,15 @@ public class IndexWriter implements Clos
// them so that they don't bother writing them to
// disk, updating SegmentInfo, etc.:
readerPool.clear(merge.segments);
-
+
if (merge.optimize) {
// cascade the optimize:
segmentsToOptimize.add(merge.info);
}
-
+
return true;
}
-
+
final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
if (infoStream != null) {
@@ -2857,14 +2989,14 @@ public class IndexWriter implements Clos
/** Hook that's called when the specified merge is complete. */
void mergeSuccess(MergePolicy.OneMerge merge) {
}
-
+
/** Checks whether this merge involves any segments
* already participating in a merge. If not, this merge
* is "registered", meaning we record that its segments
* are now participating in a merge, and true is
* returned. Else (the merge conflicts) false is
* returned. */
- final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException {
+ final synchronized boolean registerMerge(MergePolicy.OneMerge merge) throws MergePolicy.MergeAbortedException, IOException {
if (merge.registerDone)
return true;
@@ -2874,10 +3006,8 @@ public class IndexWriter implements Clos
throw new MergePolicy.MergeAbortedException("merge is aborted: " + merge.segString(directory));
}
- final int count = merge.segments.size();
boolean isExternal = false;
- for(int i=0;i<count;i++) {
- final SegmentInfo info = merge.segments.info(i);
+ for(SegmentInfo info : merge.segments) {
if (mergingSegments.contains(info)) {
return false;
}
@@ -2907,12 +3037,15 @@ public class IndexWriter implements Clos
// is running (while synchronized) to avoid race
// condition where two conflicting merges from different
// threads, start
- for(int i=0;i<count;i++) {
- mergingSegments.add(merge.segments.info(i));
+ message("registerMerge merging=" + mergingSegments);
+ for(SegmentInfo info : merge.segments) {
+ message("registerMerge info=" + info);
+ mergingSegments.add(info);
}
// Merge is now registered
merge.registerDone = true;
+
return true;
}
@@ -2987,7 +3120,6 @@ public class IndexWriter implements Clos
// Lock order: IW -> BD
bufferedDeletesStream.prune(segmentInfos);
-
Map<String,String> details = new HashMap<String,String>();
details.put("optimize", Boolean.toString(merge.optimize));
details.put("mergeFactor", Integer.toString(merge.segments.size()));
@@ -2997,6 +3129,10 @@ public class IndexWriter implements Clos
message("merge seg=" + merge.info.name);
}
+ // TODO: I think this should no longer be needed (we
+ // now build CFS before adding segment to the infos);
+ // however, on removing it, tests fail for some reason!
+
// Also enroll the merged segment into mergingSegments;
// this prevents it from getting selected for a merge
// after our merge is done but while we are building the
@@ -3004,11 +3140,11 @@ public class IndexWriter implements Clos
mergingSegments.add(merge.info);
}
- private void setDiagnostics(SegmentInfo info, String source) {
+ static void setDiagnostics(SegmentInfo info, String source) {
setDiagnostics(info, source, null);
}
- private void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
+ private static void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
Map<String,String> diagnostics = new HashMap<String,String>();
diagnostics.put("source", source);
diagnostics.put("lucene.version", Constants.LUCENE_VERSION);
@@ -3026,7 +3162,7 @@ public class IndexWriter implements Clos
/** Does fininishing for a merge, which is fast but holds
* the synchronized lock on IndexWriter instance. */
final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
-
+
// Optimize, addIndexes or finishMerges may be waiting
// on merges to finish.
notifyAll();
@@ -3035,10 +3171,11 @@ public class IndexWriter implements Clos
// exception inside mergeInit
if (merge.registerDone) {
final SegmentInfos sourceSegments = merge.segments;
- final int end = sourceSegments.size();
- for(int i=0;i<end;i++) {
- mergingSegments.remove(sourceSegments.info(i));
+ for(SegmentInfo info : sourceSegments) {
+ mergingSegments.remove(info);
}
+ // TODO: if we remove the add in _mergeInit, we should
+ // also remove this:
mergingSegments.remove(merge.info);
merge.registerDone = false;
}
@@ -3097,11 +3234,11 @@ public class IndexWriter implements Clos
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge)
throws CorruptIndexException, IOException {
-
+
merge.checkAborted(directory);
final String mergedName = merge.info.name;
-
+
int mergedDocCount = 0;
SegmentInfos sourceSegments = merge.segments;
@@ -3117,6 +3254,8 @@ public class IndexWriter implements Clos
merge.readers = new ArrayList<SegmentReader>();
merge.readerClones = new ArrayList<SegmentReader>();
+ merge.estimatedMergeBytes = 0;
+
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
@@ -3134,6 +3273,13 @@ public class IndexWriter implements Clos
-config.getReaderTermsIndexDivisor());
merge.readers.add(reader);
+ final int readerMaxDoc = reader.maxDoc();
+ if (readerMaxDoc > 0) {
+ final int delCount = reader.numDeletedDocs();
+ final double delRatio = ((double) delCount)/readerMaxDoc;
+ merge.estimatedMergeBytes += info.sizeInBytes(true) * (1.0 - delRatio);
+ }
+
// We clone the segment readers because other
// deletes may come in while we're merging so we
// need readers that will not change
@@ -3166,7 +3312,7 @@ public class IndexWriter implements Clos
message("merge store matchedCount=" + merger.getMatchedSubReaderCount() + " vs " + merge.readers.size());
}
anyNonBulkMerges |= merger.getAnyNonBulkMerges();
-
+
assert mergedDocCount == totDocCount: "mergedDocCount=" + mergedDocCount + " vs " + totDocCount;
// Very important to do this before opening the reader
@@ -3235,8 +3381,11 @@ public class IndexWriter implements Clos
merge.info.setUseCompoundFile(true);
}
- final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
+ if (infoStream != null) {
+ message(String.format("merged segment size=%.3f MB vs estimate=%.3f MB", merge.info.sizeInBytes(true)/1024./1024., merge.estimatedMergeBytes/1024/1024.));
+ }
+ final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
final int termsIndexDivisor;
final boolean loadDocStores;
@@ -3297,12 +3446,12 @@ public class IndexWriter implements Clos
// For test purposes.
final int getBufferedDeleteTermsSize() {
- return docWriter.getPendingDeletes().terms.size();
+ return docWriter.getBufferedDeleteTermsSize();
}
// For test purposes.
final int getNumBufferedDeleteTerms() {
- return docWriter.getPendingDeletes().numTermDeletes.get();
+ return docWriter.getNumBufferedDeleteTerms();
}
// utility routines for tests
@@ -3310,21 +3459,41 @@ public class IndexWriter implements Clos
return segmentInfos.size() > 0 ? segmentInfos.info(segmentInfos.size()-1) : null;
}
- public synchronized String segString() {
+ /** @lucene.internal */
+ public synchronized String segString() throws IOException {
return segString(segmentInfos);
}
- private synchronized String segString(SegmentInfos infos) {
+ /** @lucene.internal */
+ public synchronized String segString(SegmentInfos infos) throws IOException {
StringBuilder buffer = new StringBuilder();
final int count = infos.size();
for(int i = 0; i < count; i++) {
if (i > 0) {
buffer.append(' ');
}
- final SegmentInfo info = infos.info(i);
- buffer.append(info.toString(directory, 0));
- if (info.dir != directory)
- buffer.append("**");
+ buffer.append(segString(infos.info(i)));
+ }
+
+ return buffer.toString();
+ }
+
+ public synchronized String segString(SegmentInfo info) throws IOException {
+ StringBuilder buffer = new StringBuilder();
+ SegmentReader reader = readerPool.getIfExists(info);
+ try {
+ if (reader != null) {
+ buffer.append(reader.toString());
+ } else {
+ buffer.append(info.toString(directory, 0));
+ if (info.dir != directory) {
+ buffer.append("**");
+ }
+ }
+ } finally {
+ if (reader != null) {
+ readerPool.release(reader);
+ }
}
return buffer.toString();
}
@@ -3397,17 +3566,17 @@ public class IndexWriter implements Clos
assert lastCommitChangeCount <= changeCount;
myChangeCount = changeCount;
-
+
if (changeCount == lastCommitChangeCount) {
if (infoStream != null)
message(" skip startCommit(): no changes pending");
return;
}
-
+
// First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() all files
// referenced by toSync, in the background.
-
+
if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
@@ -3415,10 +3584,10 @@ public class IndexWriter implements Clos
toSync = (SegmentInfos) segmentInfos.clone();
assert filesExist(toSync);
-
+
if (commitUserData != null)
toSync.setUserData(commitUserData);
-
+
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
@@ -3550,7 +3719,7 @@ public class IndexWriter implements Clos
/** Expert: remove any index files that are no longer
* used.
- *
+ *
* <p> IndexWriter normally deletes unused files itself,
* during indexing. However, on Windows, which disallows
* deletion of open files, if there is a reader open on
@@ -3599,7 +3768,7 @@ public class IndexWriter implements Clos
public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
payloadProcessorProvider = pcp;
}
-
+
/**
* Returns the {@link PayloadProcessorProvider} that is used during segment
* merges to process payloads.
@@ -3607,124 +3776,4 @@ public class IndexWriter implements Clos
public PayloadProcessorProvider getPayloadProcessorProvider() {
return payloadProcessorProvider;
}
-
- // decides when flushes happen
- final class FlushControl {
-
- private boolean flushPending;
- private boolean flushDeletes;
- private int delCount;
- private int docCount;
- private boolean flushing;
-
- private synchronized boolean setFlushPending(String reason, boolean doWait) {
- if (flushPending || flushing) {
- if (doWait) {
- while(flushPending || flushing) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
- return false;
- } else {
- if (infoStream != null) {
- message("now trigger flush reason=" + reason);
- }
- flushPending = true;
- return flushPending;
- }
- }
-
- public synchronized void setFlushPendingNoWait(String reason) {
- setFlushPending(reason, false);
- }
-
- public synchronized boolean getFlushPending() {
- return flushPending;
- }
-
- public synchronized boolean getFlushDeletes() {
- return flushDeletes;
- }
-
- public synchronized void clearFlushPending() {
- if (infoStream != null) {
- message("clearFlushPending");
- }
- flushPending = false;
- flushDeletes = false;
- docCount = 0;
- notifyAll();
- }
-
- public synchronized void clearDeletes() {
- delCount = 0;
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc) {
- return waitUpdate(docInc, delInc, false);
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
- while(flushPending) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- // skipWait is only used when a thread is BOTH adding
- // a doc and buffering a del term, and, the adding of
- // the doc already triggered a flush
- if (skipWait) {
- docCount += docInc;
- delCount += delInc;
- return false;
- }
-
- final int maxBufferedDocs = config.getMaxBufferedDocs();
- if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (docCount+docInc) >= maxBufferedDocs) {
- return setFlushPending("maxBufferedDocs", true);
- }
- docCount += docInc;
-
- final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
- if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (delCount+delInc) >= maxBufferedDeleteTerms) {
- flushDeletes = true;
- return setFlushPending("maxBufferedDeleteTerms", true);
- }
- delCount += delInc;
-
- return flushByRAMUsage("add delete/doc");
- }
-
- public synchronized boolean flushByRAMUsage(String reason) {
- final double ramBufferSizeMB = config.getRAMBufferSizeMB();
- if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- final long limit = (long) (ramBufferSizeMB*1024*1024);
- long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
- if (used >= limit) {
-
- // DocumentsWriter may be able to free up some
- // RAM:
- // Lock order: FC -> DW
- docWriter.balanceRAM();
-
- used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
- if (used >= limit) {
- return setFlushPending("ram full: " + reason, false);
- }
- }
- }
- return false;
- }
- }
-
- final FlushControl flushControl = new FlushControl();
}