You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC
svn commit: r1098427 [3/5] - in /lucene/dev/trunk: ./ lucene/
lucene/src/java/org/apache/lucene/index/
lucene/src/test-framework/org/apache/lucene/search/
lucene/src/test-framework/org/apache/lucene/store/
lucene/src/test-framework/org/apache/lucene/ut...
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Sun May 1 22:38:33 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
@@ -46,6 +47,7 @@ import org.apache.lucene.store.BufferedI
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -54,17 +56,16 @@ import org.apache.lucene.util.MapBackedS
/**
An <code>IndexWriter</code> creates and maintains an index.
- <p>The <code>create</code> argument to the {@link
- #IndexWriter(Directory, IndexWriterConfig) constructor} determines
+ <p>The {@link OpenMode} option on
+ {@link IndexWriterConfig#setOpenMode(OpenMode)} determines
whether a new index is created, or whether an existing index is
- opened. Note that you can open an index with <code>create=true</code>
- even while readers are using the index. The old readers will
+ opened. Note that you can open an index with {@link OpenMode#CREATE}
+ even while readers are using the index. The old readers will
continue to search the "point in time" snapshot they had opened,
- and won't see the newly created index until they re-open. There are
- also {@link #IndexWriter(Directory, IndexWriterConfig) constructors}
- with no <code>create</code> argument which will create a new index
- if there is not already an index at the provided path and otherwise
- open the existing index.</p>
+ and won't see the newly created index until they re-open. If
+ {@link OpenMode#CREATE_OR_APPEND} is used IndexWriter will create a
+ new index if there is not already an index at the provided path
+ and otherwise open the existing index.</p>
<p>In either case, documents are added with {@link #addDocument(Document)
addDocument} and removed with {@link #deleteDocuments(Term)} or {@link
@@ -76,15 +77,19 @@ import org.apache.lucene.util.MapBackedS
<a name="flush"></a>
<p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method
- calls). A flush is triggered when there are enough
- buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
- or enough added documents since the last flush, whichever
- is sooner. For the added documents, flushing is triggered
- either by RAM usage of the documents (see {@link
- IndexWriterConfig#setRAMBufferSizeMB}) or the number of added documents.
- The default is to flush when RAM usage hits 16 MB. For
+ calls). A flush is triggered when there are enough added documents
+ since the last flush. Flushing is triggered either by RAM usage of the
+ documents (see {@link IndexWriterConfig#setRAMBufferSizeMB}) or the
+ number of added documents (see {@link IndexWriterConfig#setMaxBufferedDocs(int)}).
+ The default is to flush when RAM usage hits
+ {@value IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
best indexing speed you should flush by RAM usage with a
- large RAM buffer. Note that flushing just moves the
+ large RAM buffer. Additionally, if IndexWriter reaches the configured number of
+ buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
+ the deleted terms and queries are flushed and applied to existing segments.
+ In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and
+ {@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
+ won't trigger a segment flush. Note that flushing just moves the
internal buffered state in IndexWriter into the index, but
these changes are not visible to IndexReader until either
{@link #commit()} or {@link #close} is called. A flush may
@@ -165,21 +170,21 @@ import org.apache.lucene.util.MapBackedS
/*
* Clarification: Check Points (and commits)
* IndexWriter writes new index files to the directory without writing a new segments_N
- * file which references these new files. It also means that the state of
+ * file which references these new files. It also means that the state of
* the in memory SegmentInfos object is different than the most recent
* segments_N file written to the directory.
- *
- * Each time the SegmentInfos is changed, and matches the (possibly
- * modified) directory files, we have a new "check point".
- * If the modified/new SegmentInfos is written to disk - as a new
- * (generation of) segments_N file - this check point is also an
+ *
+ * Each time the SegmentInfos is changed, and matches the (possibly
+ * modified) directory files, we have a new "check point".
+ * If the modified/new SegmentInfos is written to disk - as a new
+ * (generation of) segments_N file - this check point is also an
* IndexCommit.
- *
- * A new checkpoint always replaces the previous checkpoint and
- * becomes the new "front" of the index. This allows the IndexFileDeleter
+ *
+ * A new checkpoint always replaces the previous checkpoint and
+ * becomes the new "front" of the index. This allows the IndexFileDeleter
* to delete files that are referenced only by stale checkpoints.
* (files that were created since the last commit, but are no longer
- * referenced by the "front" of the index). For this, IndexFileDeleter
+ * referenced by the "front" of the index). For this, IndexFileDeleter
* keeps track of the last non commit checkpoint.
*/
public class IndexWriter implements Closeable {
@@ -195,7 +200,7 @@ public class IndexWriter implements Clos
* printed to infoStream, if set (see {@link
* #setInfoStream}).
*/
- public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8;
+ public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
// The normal read buffer size defaults to 1024, but
// increasing this during merging seems to yield
@@ -225,7 +230,7 @@ public class IndexWriter implements Clos
final FieldNumberBiMap globalFieldNumberMap;
private DocumentsWriter docWriter;
- private IndexFileDeleter deleter;
+ final IndexFileDeleter deleter;
private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>(); // used by optimize to note those needing optimization
private int optimizeMaxNumSegments;
@@ -247,12 +252,12 @@ public class IndexWriter implements Clos
private long mergeGen;
private boolean stopMerges;
- private final AtomicInteger flushCount = new AtomicInteger();
- private final AtomicInteger flushDeletesCount = new AtomicInteger();
+ final AtomicInteger flushCount = new AtomicInteger();
+ final AtomicInteger flushDeletesCount = new AtomicInteger();
final ReaderPool readerPool = new ReaderPool();
final BufferedDeletesStream bufferedDeletesStream;
-
+
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
// then cooled to permanently record the event): it's
@@ -339,31 +344,58 @@ public class IndexWriter implements Clos
*/
IndexReader getReader(boolean applyAllDeletes) throws IOException {
ensureOpen();
-
+
final long tStart = System.currentTimeMillis();
if (infoStream != null) {
message("flush at getReader");
}
-
// Do this up front before flushing so that the readers
// obtained during this flush are pooled, the first time
// this method is called:
poolReaders = true;
-
- // Prevent segmentInfos from changing while opening the
- // reader; in theory we could do similar retry logic,
- // just like we do when loading segments_N
- IndexReader r;
- synchronized(this) {
- flush(false, applyAllDeletes);
- r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
- if (infoStream != null) {
- message("return reader version=" + r.getVersion() + " reader=" + r);
+ final IndexReader r;
+ doBeforeFlush();
+ final boolean anySegmentFlushed;
+ /*
+ * for releasing a NRT reader we must ensure that
+ * DW doesn't add any segments or deletes until we are
+ * done with creating the NRT DirectoryReader.
+ * We release the two stage full flush after we are done opening the
+ * directory reader!
+ */
+ synchronized (fullFlushLock) {
+ boolean success = false;
+ try {
+ anySegmentFlushed = docWriter.flushAllThreads();
+ if (!anySegmentFlushed) {
+ // prevent double increment since docWriter#doFlush increments the flushcount
+ // if we flushed anything.
+ flushCount.incrementAndGet();
+ }
+ success = true;
+ // Prevent segmentInfos from changing while opening the
+ // reader; in theory we could do similar retry logic,
+ // just like we do when loading segments_N
+ synchronized(this) {
+ maybeApplyDeletes(applyAllDeletes);
+ r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+ if (infoStream != null) {
+ message("return reader version=" + r.getVersion() + " reader=" + r);
+ }
+ }
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception during while NRT reader");
+ }
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(success);
+ doAfterFlush();
}
}
- maybeMerge();
-
+ if (anySegmentFlushed) {
+ maybeMerge();
+ }
if (infoStream != null) {
message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
}
@@ -400,10 +432,10 @@ public class IndexWriter implements Clos
if (r != null) {
r.hasChanges = false;
}
- }
+ }
}
}
-
+
// used only by asserts
public synchronized boolean infoIsLive(SegmentInfo info) {
int idx = segmentInfos.indexOf(info);
@@ -419,7 +451,7 @@ public class IndexWriter implements Clos
}
return info;
}
-
+
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
@@ -432,7 +464,7 @@ public class IndexWriter implements Clos
public synchronized boolean release(SegmentReader sr) throws IOException {
return release(sr, false);
}
-
+
/**
* Release the segment reader (i.e. decRef it and close if there
* are no more references.
@@ -493,7 +525,7 @@ public class IndexWriter implements Clos
sr.close();
}
}
-
+
/** Remove all our references to readers, and commits
* any pending changes. */
synchronized void close() throws IOException {
@@ -503,7 +535,7 @@ public class IndexWriter implements Clos
Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
-
+
Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
SegmentReader sr = ent.getValue();
@@ -526,7 +558,7 @@ public class IndexWriter implements Clos
sr.decRef();
}
}
-
+
/**
* Commit all segment reader in the pool.
* @throws IOException
@@ -550,7 +582,7 @@ public class IndexWriter implements Clos
}
}
}
-
+
/**
* Returns a ref to a clone. NOTE: this clone is not
* enrolled in the pool, so you should simply close()
@@ -564,7 +596,7 @@ public class IndexWriter implements Clos
sr.decRef();
}
}
-
+
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
@@ -580,7 +612,7 @@ public class IndexWriter implements Clos
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
- *
+ *
* @see #release(SegmentReader)
* @param info
* @param doOpenStores
@@ -638,7 +670,7 @@ public class IndexWriter implements Clos
return sr;
}
}
-
+
/**
* Obtain the number of deleted docs for a pooled reader.
* If the reader isn't being pooled, the segmentInfo's
@@ -658,7 +690,7 @@ public class IndexWriter implements Clos
}
}
}
-
+
/**
* Used internally to throw an {@link
* AlreadyClosedException} if this IndexWriter has been
@@ -721,7 +753,7 @@ public class IndexWriter implements Clos
mergePolicy.setIndexWriter(this);
mergeScheduler = conf.getMergeScheduler();
codecs = conf.getCodecProvider();
-
+
bufferedDeletesStream = new BufferedDeletesStream(messageID);
bufferedDeletesStream.setInfoStream(infoStream);
poolReaders = conf.getReaderPooling();
@@ -790,8 +822,7 @@ public class IndexWriter implements Clos
// start with previous field numbers, but new FieldInfos
globalFieldNumberMap = segmentInfos.getOrLoadGlobalFieldNumberMap(directory);
- docWriter = new DocumentsWriter(config, directory, this, conf.getIndexingChain(),
- globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)), bufferedDeletesStream);
+ docWriter = new DocumentsWriter(config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
docWriter.setInfoStream(infoStream);
// Default deleter (for backwards compatibility) is
@@ -849,7 +880,7 @@ public class IndexWriter implements Clos
public IndexWriterConfig getConfig() {
return config;
}
-
+
/** If non-null, this will be the default infoStream used
* by a newly instantiated IndexWriter.
* @see #setInfoStream
@@ -901,7 +932,7 @@ public class IndexWriter implements Clos
public boolean verbose() {
return infoStream != null;
}
-
+
/**
* Commits all changes to an index and closes all
* associated files. Note that this may be a costly
@@ -916,7 +947,7 @@ public class IndexWriter implements Clos
* even though part of it (flushing buffered documents)
* may have succeeded, so the write lock will still be
* held.</p>
- *
+ *
* <p> If you can correct the underlying cause (eg free up
* some disk space) then you can call close() again.
* Failing that, if you want to force the write lock to be
@@ -1036,7 +1067,7 @@ public class IndexWriter implements Clos
if (infoStream != null)
message("now call final commit()");
-
+
if (!hitOOM) {
commitInternal(null);
}
@@ -1049,7 +1080,7 @@ public class IndexWriter implements Clos
docWriter = null;
deleter.close();
}
-
+
if (writeLock != null) {
writeLock.release(); // release write lock
writeLock = null;
@@ -1072,7 +1103,7 @@ public class IndexWriter implements Clos
}
/** Returns the Directory used by this index. */
- public Directory getDirectory() {
+ public Directory getDirectory() {
// Pass false because the flush during closing calls getDirectory
ensureOpen(false);
return directory;
@@ -1196,22 +1227,7 @@ public class IndexWriter implements Clos
* @throws IOException if there is a low-level IO error
*/
public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
- ensureOpen();
- boolean doFlush = false;
- boolean success = false;
- try {
- try {
- doFlush = docWriter.updateDocument(doc, analyzer, null);
- success = true;
- } finally {
- if (!success && infoStream != null)
- message("hit exception adding document");
- }
- if (doFlush)
- flush(true, false);
- } catch (OutOfMemoryError oom) {
- handleOOM(oom, "addDocument");
- }
+ updateDocument(null, doc, analyzer);
}
/**
@@ -1228,9 +1244,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteTerm(term, false)) {
- flush(true, false);
- }
+ docWriter.deleteTerms(term);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term)");
}
@@ -1238,7 +1252,8 @@ public class IndexWriter implements Clos
/**
* Deletes the document(s) containing any of the
- * terms. All deletes are flushed at the same time.
+ * terms. All given deletes are applied and flushed atomically
+ * at the same time.
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
@@ -1252,9 +1267,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteTerms(terms)) {
- flush(true, false);
- }
+ docWriter.deleteTerms(terms);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Term..)");
}
@@ -1274,9 +1287,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteQuery(query)) {
- flush(true, false);
- }
+ docWriter.deleteQueries(query);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query)");
}
@@ -1284,7 +1295,7 @@ public class IndexWriter implements Clos
/**
* Deletes the document(s) matching any of the provided queries.
- * All deletes are flushed at the same time.
+ * All given deletes are applied and flushed atomically at the same time.
*
* <p><b>NOTE</b>: if this method hits an OutOfMemoryError
* you should immediately close the writer. See <a
@@ -1298,9 +1309,7 @@ public class IndexWriter implements Clos
public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
ensureOpen();
try {
- if (docWriter.deleteQueries(queries)) {
- flush(true, false);
- }
+ docWriter.deleteQueries(queries);
} catch (OutOfMemoryError oom) {
handleOOM(oom, "deleteDocuments(Query..)");
}
@@ -1350,17 +1359,18 @@ public class IndexWriter implements Clos
throws CorruptIndexException, IOException {
ensureOpen();
try {
- boolean doFlush = false;
boolean success = false;
+ boolean anySegmentFlushed = false;
try {
- doFlush = docWriter.updateDocument(doc, analyzer, term);
+ anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
success = true;
} finally {
if (!success && infoStream != null)
message("hit exception updating document");
}
- if (doFlush) {
- flush(true, false);
+
+ if (anySegmentFlushed) {
+ maybeMerge();
}
} catch (OutOfMemoryError oom) {
handleOOM(oom, "updateDocument");
@@ -1546,7 +1556,7 @@ public class IndexWriter implements Clos
resetMergeExceptions();
segmentsToOptimize = new HashSet<SegmentInfo>(segmentInfos);
optimizeMaxNumSegments = maxNumSegments;
-
+
// Now mark all pending & running merges as optimize
// merge:
for(final MergePolicy.OneMerge merge : pendingMerges) {
@@ -1612,12 +1622,12 @@ public class IndexWriter implements Clos
if (merge.optimize)
return true;
}
-
+
for (final MergePolicy.OneMerge merge : runningMerges) {
if (merge.optimize)
return true;
}
-
+
return false;
}
@@ -1914,7 +1924,7 @@ public class IndexWriter implements Clos
/**
* Delete all documents in the index.
*
- * <p>This method will drop all buffered documents and will
+ * <p>This method will drop all buffered documents and will
* remove all segments from the index. This change will not be
* visible until a {@link #commit()} has been called. This method
* can be rolled back using {@link #rollback()}.</p>
@@ -1944,7 +1954,7 @@ public class IndexWriter implements Clos
deleter.refresh();
// Don't bother saving any changes in our segmentInfos
- readerPool.clear(null);
+ readerPool.clear(null);
// Mark that the index has changed
++changeCount;
@@ -1971,7 +1981,7 @@ public class IndexWriter implements Clos
mergeFinish(merge);
}
pendingMerges.clear();
-
+
for (final MergePolicy.OneMerge merge : runningMerges) {
if (infoStream != null)
message("now abort running merge " + merge.segString(directory));
@@ -1998,7 +2008,7 @@ public class IndexWriter implements Clos
message("all running merges have aborted");
} else {
- // waitForMerges() will ensure any running addIndexes finishes.
+ // waitForMerges() will ensure any running addIndexes finishes.
// It's fine if a new one attempts to start because from our
// caller above the call will see that we are in the
// process of closing, and will throw an
@@ -2010,7 +2020,7 @@ public class IndexWriter implements Clos
/**
* Wait for any currently outstanding merges to finish.
*
- * <p>It is guaranteed that any merges started prior to calling this method
+ * <p>It is guaranteed that any merges started prior to calling this method
* will have completed once this method completes.</p>
*/
public synchronized void waitForMerges() {
@@ -2040,6 +2050,125 @@ public class IndexWriter implements Clos
deleter.checkpoint(segmentInfos, false);
}
+ /**
+ * Prepares the {@link SegmentInfo} for the new flushed segment and persists
+ * the deleted documents {@link BitVector}. Use
+ * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
+ * publish the returned {@link SegmentInfo} together with its segment private
+ * delete packet.
+ *
+ * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
+ */
+ SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+ assert flushedSegment != null;
+
+ SegmentInfo newSegment = flushedSegment.segmentInfo;
+
+ setDiagnostics(newSegment, "flush");
+
+ boolean success = false;
+ try {
+ if (useCompoundFile(newSegment)) {
+ String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+ message("creating compound file " + compoundFileName);
+ // Now build compound file
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+ for(String fileName : newSegment.files()) {
+ cfsWriter.addFile(fileName);
+ }
+
+ // Perform the merge
+ cfsWriter.close();
+ synchronized(this) {
+ deleter.deleteNewFiles(newSegment.files());
+ }
+
+ newSegment.setUseCompoundFile(true);
+ }
+
+ // Must write deleted docs after the CFS so we don't
+ // slurp the del file into CFS:
+ if (flushedSegment.deletedDocuments != null) {
+ final int delCount = flushedSegment.deletedDocuments.count();
+ assert delCount > 0;
+ newSegment.setDelCount(delCount);
+ newSegment.advanceDelGen();
+ final String delFileName = newSegment.getDelFileName();
+ if (infoStream != null) {
+ message("flush: write " + delCount + " deletes to " + delFileName);
+ }
+ boolean success2 = false;
+ try {
+ // TODO: in the NRT case it'd be better to hand
+ // this del vector over to the
+ // shortly-to-be-opened SegmentReader and let it
+ // carry the changes; there's no reason to use
+ // filesystem as intermediary here.
+ flushedSegment.deletedDocuments.write(directory, delFileName);
+ success2 = true;
+ } finally {
+ if (!success2) {
+ try {
+ directory.deleteFile(delFileName);
+ } catch (Throwable t) {
+ // suppress this so we keep throwing the
+ // original exception
+ }
+ }
+ }
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ if (infoStream != null) {
+ message("hit exception " +
+ "reating compound file for newly flushed segment " + newSegment.name);
+ }
+
+ synchronized(this) {
+ deleter.refresh(newSegment.name);
+ }
+ }
+ }
+ return newSegment;
+ }
+
+ /**
+ * Atomically adds the segment private delete packet and publishes the flushed
+ * segments SegmentInfo to the index writer. NOTE: use
+ * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
+ * {@link SegmentInfo} for the flushed segment.
+ *
+ * @see #prepareFlushedSegment(FlushedSegment)
+ */
+ synchronized void publishFlushedSegment(SegmentInfo newSegment,
+ FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
+ // Lock order IW -> BDS
+ synchronized (bufferedDeletesStream) {
+ if (globalPacket != null && globalPacket.any()) {
+ bufferedDeletesStream.push(globalPacket);
+ }
+ // Publishing the segment must be synched on IW -> BDS to make the sure
+ // that no merge prunes away the seg. private delete packet
+ final long nextGen;
+ if (packet != null && packet.any()) {
+ nextGen = bufferedDeletesStream.push(packet);
+ } else {
+ // Since we don't have a delete packet to apply we can get a new
+ // generation right away
+ nextGen = bufferedDeletesStream.getNextGen();
+ }
+ newSegment.setBufferedDeletesGen(nextGen);
+ segmentInfos.add(newSegment);
+ checkpoint();
+ }
+ }
+
+ synchronized boolean useCompoundFile(SegmentInfo segmentInfo) throws IOException {
+ return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+ }
+
private synchronized void resetMergeExceptions() {
mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
mergeGen++;
@@ -2088,11 +2217,11 @@ public class IndexWriter implements Clos
* <p>
* <b>NOTE:</b> this method only copies the segments of the incoming indexes
* and does not merge them. Therefore deleted documents are not removed and
- * the new segments are not merged with the existing ones. Also, the segments
- * are copied as-is, meaning they are not converted to CFS if they aren't,
- * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
+ * the new segments are not merged with the existing ones. Also, the segments
+ * are copied as-is, meaning they are not converted to CFS if they aren't,
+ * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
* or {@link #optimize} afterwards.
- *
+ *
* <p>This requires this index not be among those to be added.
*
* <p>
@@ -2129,7 +2258,7 @@ public class IndexWriter implements Clos
docCount += info.docCount;
String newSegName = newSegmentName();
String dsName = info.getDocStoreSegment();
-
+
if (infoStream != null) {
message("addIndexes: process segment origName=" + info.name + " newName=" + newSegName + " dsName=" + dsName + " info=" + info);
}
@@ -2176,7 +2305,7 @@ public class IndexWriter implements Clos
infos.add(info);
}
- }
+ }
synchronized (this) {
ensureOpen();
@@ -2225,11 +2354,12 @@ public class IndexWriter implements Clos
SegmentMerger merger = new SegmentMerger(directory, config.getTermIndexInterval(),
mergedName, null, codecs, payloadProcessorProvider,
globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)));
-
+
for (IndexReader reader : readers) // add new indexes
merger.add(reader);
-
+
int docCount = merger.merge(); // merge 'em
+
final FieldInfos fieldInfos = merger.fieldInfos();
SegmentInfo info = new SegmentInfo(mergedName, docCount, directory,
false, fieldInfos.hasProx(), merger.getSegmentCodecs(),
@@ -2241,11 +2371,11 @@ public class IndexWriter implements Clos
synchronized(this) { // Guard segmentInfos
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, info);
}
-
+
// Now create the compound file if needed
if (useCompoundFile) {
merger.createCompoundFile(mergedName + ".cfs", info);
-
+
// delete new non cfs files directly: they were never
// registered with IFD
deleter.deleteNewFiles(info.files());
@@ -2297,7 +2427,7 @@ public class IndexWriter implements Clos
* #commit()} to finish the commit, or {@link
* #rollback()} to revert the commit and undo all changes
* done since the writer was opened.</p>
- *
+ *
* You can also just call {@link #commit(Map)} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
@@ -2441,6 +2571,10 @@ public class IndexWriter implements Clos
}
}
+ // Ensures only one flush() is actually flushing segments
+ // at a time:
+ private final Object fullFlushLock = new Object();
+
/**
* Flush all in-memory buffered updates (adds and deletes)
* to the Directory.
@@ -2464,116 +2598,104 @@ public class IndexWriter implements Clos
}
}
- // TODO: this method should not have to be entirely
- // synchronized, ie, merges should be allowed to commit
- // even while a flush is happening
- private synchronized boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
-
+ private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
}
doBeforeFlush();
-
assert testPoint("startDoFlush");
-
- // We may be flushing because it was triggered by doc
- // count, del count, ram usage (in which case flush
- // pending is already set), or we may be flushing
- // due to external event eg getReader or commit is
- // called (in which case we now set it, and this will
- // pause all threads):
- flushControl.setFlushPendingNoWait("explicit flush");
-
boolean success = false;
-
try {
if (infoStream != null) {
message(" start flush: applyAllDeletes=" + applyAllDeletes);
message(" index before flush " + segString());
}
-
- final SegmentInfo newSegment = docWriter.flush(this, deleter, mergePolicy, segmentInfos);
- if (newSegment != null) {
- setDiagnostics(newSegment, "flush");
- segmentInfos.add(newSegment);
- checkpoint();
- }
-
- if (!applyAllDeletes) {
- // If deletes alone are consuming > 1/2 our RAM
- // buffer, force them all to apply now. This is to
- // prevent too-frequent flushing of a long tail of
- // tiny segments:
- if (flushControl.getFlushDeletes() ||
- (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
- applyAllDeletes = true;
- if (infoStream != null) {
- message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
- }
+ final boolean anySegmentFlushed;
+
+ synchronized (fullFlushLock) {
+ try {
+ anySegmentFlushed = docWriter.flushAllThreads();
+ success = true;
+ } finally {
+ docWriter.finishFullFlush(success);
}
}
-
- if (applyAllDeletes) {
- if (infoStream != null) {
- message("apply all deletes during flush");
- }
- flushDeletesCount.incrementAndGet();
- final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
- if (result.anyDeletes) {
- checkpoint();
- }
- if (!keepFullyDeletedSegments && result.allDeleted != null) {
- if (infoStream != null) {
- message("drop 100% deleted segments: " + result.allDeleted);
- }
- for(SegmentInfo info : result.allDeleted) {
- // If a merge has already registered for this
- // segment, we leave it in the readerPool; the
- // merge will skip merging it and will then drop
- // it once it's done:
- if (!mergingSegments.contains(info)) {
- segmentInfos.remove(info);
- if (readerPool != null) {
- readerPool.drop(info);
- }
- }
- }
- checkpoint();
+ success = false;
+ synchronized(this) {
+ maybeApplyDeletes(applyAllDeletes);
+ doAfterFlush();
+ if (!anySegmentFlushed) {
+ // flushCount is incremented in flushAllThreads
+ flushCount.incrementAndGet();
}
- bufferedDeletesStream.prune(segmentInfos);
- assert !bufferedDeletesStream.any();
- flushControl.clearDeletes();
- } else if (infoStream != null) {
- message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+ success = true;
+ return anySegmentFlushed;
}
-
- doAfterFlush();
- flushCount.incrementAndGet();
-
- success = true;
-
- return newSegment != null;
-
} catch (OutOfMemoryError oom) {
handleOOM(oom, "doFlush");
// never hit
return false;
} finally {
- flushControl.clearFlushPending();
if (!success && infoStream != null)
message("hit exception during flush");
}
}
+
+ final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
+ if (applyAllDeletes) {
+ if (infoStream != null) {
+ message("apply all deletes during flush");
+ }
+ applyAllDeletes();
+ } else if (infoStream != null) {
+ message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+ }
+
+ }
+
+ final synchronized void applyAllDeletes() throws IOException {
+ flushDeletesCount.incrementAndGet();
+ final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
+ .applyDeletes(readerPool, segmentInfos);
+ if (result.anyDeletes) {
+ checkpoint();
+ }
+ if (!keepFullyDeletedSegments && result.allDeleted != null) {
+ if (infoStream != null) {
+ message("drop 100% deleted segments: " + result.allDeleted);
+ }
+ for (SegmentInfo info : result.allDeleted) {
+ // If a merge has already registered for this
+ // segment, we leave it in the readerPool; the
+ // merge will skip merging it and will then drop
+ // it once it's done:
+ if (!mergingSegments.contains(info)) {
+ segmentInfos.remove(info);
+ if (readerPool != null) {
+ readerPool.drop(info);
+ }
+ }
+ }
+ checkpoint();
+ }
+ bufferedDeletesStream.prune(segmentInfos);
+ }
/** Expert: Return the total size of all index files currently cached in memory.
* Useful for size management with flushRamDocs()
*/
public final long ramSizeInBytes() {
ensureOpen();
- return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
+ return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+ }
+
+ // for testing only
+ DocumentsWriter getDocsWriter() {
+ boolean test = false;
+ assert test = true;
+ return test?docWriter: null;
}
/** Expert: Return the number of documents currently
@@ -2709,7 +2831,7 @@ public class IndexWriter implements Clos
}
commitMergedDeletes(merge, mergedReader);
-
+
// If the doc store we are using has been closed and
// is in now compound format (but wasn't when we
// started), then we will switch to the compound
@@ -2723,7 +2845,7 @@ public class IndexWriter implements Clos
message("merged segment " + merge.info + " is 100% deleted" + (keepFullyDeletedSegments ? "" : "; skipping insert"));
}
- final Set mergedAway = new HashSet<SegmentInfo>(merge.segments);
+ final Set<SegmentInfo> mergedAway = new HashSet<SegmentInfo>(merge.segments);
int segIdx = 0;
int newSegIdx = 0;
boolean inserted = false;
@@ -2770,15 +2892,15 @@ public class IndexWriter implements Clos
// them so that they don't bother writing them to
// disk, updating SegmentInfo, etc.:
readerPool.clear(merge.segments);
-
+
if (merge.optimize) {
// cascade the optimize:
segmentsToOptimize.add(merge.info);
}
-
+
return true;
}
-
+
final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
if (infoStream != null) {
@@ -2867,7 +2989,7 @@ public class IndexWriter implements Clos
/** Hook that's called when the specified merge is complete. */
void mergeSuccess(MergePolicy.OneMerge merge) {
}
-
+
/** Checks whether this merge involves any segments
* already participating in a merge. If not, this merge
* is "registered", meaning we record that its segments
@@ -2998,7 +3120,6 @@ public class IndexWriter implements Clos
// Lock order: IW -> BD
bufferedDeletesStream.prune(segmentInfos);
-
Map<String,String> details = new HashMap<String,String>();
details.put("optimize", Boolean.toString(merge.optimize));
details.put("mergeFactor", Integer.toString(merge.segments.size()));
@@ -3019,11 +3140,11 @@ public class IndexWriter implements Clos
mergingSegments.add(merge.info);
}
- private void setDiagnostics(SegmentInfo info, String source) {
+ static void setDiagnostics(SegmentInfo info, String source) {
setDiagnostics(info, source, null);
}
- private void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
+ private static void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
Map<String,String> diagnostics = new HashMap<String,String>();
diagnostics.put("source", source);
diagnostics.put("lucene.version", Constants.LUCENE_VERSION);
@@ -3041,7 +3162,7 @@ public class IndexWriter implements Clos
/** Does fininishing for a merge, which is fast but holds
* the synchronized lock on IndexWriter instance. */
final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
-
+
// Optimize, addIndexes or finishMerges may be waiting
// on merges to finish.
notifyAll();
@@ -3113,11 +3234,11 @@ public class IndexWriter implements Clos
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge)
throws CorruptIndexException, IOException {
-
+
merge.checkAborted(directory);
final String mergedName = merge.info.name;
-
+
int mergedDocCount = 0;
SegmentInfos sourceSegments = merge.segments;
@@ -3191,7 +3312,7 @@ public class IndexWriter implements Clos
message("merge store matchedCount=" + merger.getMatchedSubReaderCount() + " vs " + merge.readers.size());
}
anyNonBulkMerges |= merger.getAnyNonBulkMerges();
-
+
assert mergedDocCount == totDocCount: "mergedDocCount=" + mergedDocCount + " vs " + totDocCount;
// Very important to do this before opening the reader
@@ -3325,12 +3446,12 @@ public class IndexWriter implements Clos
// For test purposes.
final int getBufferedDeleteTermsSize() {
- return docWriter.getPendingDeletes().terms.size();
+ return docWriter.getBufferedDeleteTermsSize();
}
// For test purposes.
final int getNumBufferedDeleteTerms() {
- return docWriter.getPendingDeletes().numTermDeletes.get();
+ return docWriter.getNumBufferedDeleteTerms();
}
// utility routines for tests
@@ -3445,17 +3566,17 @@ public class IndexWriter implements Clos
assert lastCommitChangeCount <= changeCount;
myChangeCount = changeCount;
-
+
if (changeCount == lastCommitChangeCount) {
if (infoStream != null)
message(" skip startCommit(): no changes pending");
return;
}
-
+
// First, we clone & incref the segmentInfos we intend
// to sync, then, without locking, we sync() all files
// referenced by toSync, in the background.
-
+
if (infoStream != null)
message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
@@ -3463,10 +3584,10 @@ public class IndexWriter implements Clos
toSync = (SegmentInfos) segmentInfos.clone();
assert filesExist(toSync);
-
+
if (commitUserData != null)
toSync.setUserData(commitUserData);
-
+
// This protects the segmentInfos we are now going
// to commit. This is important in case, eg, while
// we are trying to sync all referenced files, a
@@ -3598,7 +3719,7 @@ public class IndexWriter implements Clos
/** Expert: remove any index files that are no longer
* used.
- *
+ *
* <p> IndexWriter normally deletes unused files itself,
* during indexing. However, on Windows, which disallows
* deletion of open files, if there is a reader open on
@@ -3647,7 +3768,7 @@ public class IndexWriter implements Clos
public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
payloadProcessorProvider = pcp;
}
-
+
/**
* Returns the {@link PayloadProcessorProvider} that is used during segment
* merges to process payloads.
@@ -3655,124 +3776,4 @@ public class IndexWriter implements Clos
public PayloadProcessorProvider getPayloadProcessorProvider() {
return payloadProcessorProvider;
}
-
- // decides when flushes happen
- final class FlushControl {
-
- private boolean flushPending;
- private boolean flushDeletes;
- private int delCount;
- private int docCount;
- private boolean flushing;
-
- private synchronized boolean setFlushPending(String reason, boolean doWait) {
- if (flushPending || flushing) {
- if (doWait) {
- while(flushPending || flushing) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
- }
- return false;
- } else {
- if (infoStream != null) {
- message("now trigger flush reason=" + reason);
- }
- flushPending = true;
- return flushPending;
- }
- }
-
- public synchronized void setFlushPendingNoWait(String reason) {
- setFlushPending(reason, false);
- }
-
- public synchronized boolean getFlushPending() {
- return flushPending;
- }
-
- public synchronized boolean getFlushDeletes() {
- return flushDeletes;
- }
-
- public synchronized void clearFlushPending() {
- if (infoStream != null) {
- message("clearFlushPending");
- }
- flushPending = false;
- flushDeletes = false;
- docCount = 0;
- notifyAll();
- }
-
- public synchronized void clearDeletes() {
- delCount = 0;
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc) {
- return waitUpdate(docInc, delInc, false);
- }
-
- public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
- while(flushPending) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- // skipWait is only used when a thread is BOTH adding
- // a doc and buffering a del term, and, the adding of
- // the doc already triggered a flush
- if (skipWait) {
- docCount += docInc;
- delCount += delInc;
- return false;
- }
-
- final int maxBufferedDocs = config.getMaxBufferedDocs();
- if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (docCount+docInc) >= maxBufferedDocs) {
- return setFlushPending("maxBufferedDocs", true);
- }
- docCount += docInc;
-
- final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
- if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
- (delCount+delInc) >= maxBufferedDeleteTerms) {
- flushDeletes = true;
- return setFlushPending("maxBufferedDeleteTerms", true);
- }
- delCount += delInc;
-
- return flushByRAMUsage("add delete/doc");
- }
-
- public synchronized boolean flushByRAMUsage(String reason) {
- final double ramBufferSizeMB = config.getRAMBufferSizeMB();
- if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- final long limit = (long) (ramBufferSizeMB*1024*1024);
- long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
- if (used >= limit) {
-
- // DocumentsWriter may be able to free up some
- // RAM:
- // Lock order: FC -> DW
- docWriter.balanceRAM();
-
- used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
- if (used >= limit) {
- return setFlushPending("ram full: " + reason, false);
- }
- }
- }
- return false;
- }
- }
-
- final FlushControl flushControl = new FlushControl();
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Sun May 1 22:38:33 2011
@@ -18,7 +18,7 @@ package org.apache.lucene.index;
*/
import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DocumentsWriter.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.search.IndexSearcher;
@@ -41,7 +41,7 @@ import org.apache.lucene.util.Version;
* IndexWriterConfig conf = new IndexWriterConfig(analyzer);
* conf.setter1().setter2();
* </pre>
- *
+ *
* @since 3.1
*/
public final class IndexWriterConfig implements Cloneable {
@@ -56,7 +56,7 @@ public final class IndexWriterConfig imp
* </ul>
*/
public static enum OpenMode { CREATE, APPEND, CREATE_OR_APPEND }
-
+
/** Default value is 32. Change using {@link #setTermIndexInterval(int)}. */
public static final int DEFAULT_TERM_INDEX_INTERVAL = 32; // TODO: this should be private to the codec, not settable here
@@ -77,23 +77,19 @@ public final class IndexWriterConfig imp
/**
* Default value for the write lock timeout (1,000 ms).
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long WRITE_LOCK_TIMEOUT = 1000;
- /** The maximum number of simultaneous threads that may be
- * indexing documents at once in IndexWriter; if more
- * than this many threads arrive they will wait for
- * others to finish. */
- public final static int DEFAULT_MAX_THREAD_STATES = 8;
-
/** Default setting for {@link #setReaderPooling}. */
public final static boolean DEFAULT_READER_POOLING = false;
/** Default value is 1. Change using {@link #setReaderTermsIndexDivisor(int)}. */
public static final int DEFAULT_READER_TERMS_INDEX_DIVISOR = IndexReader.DEFAULT_TERMS_INDEX_DIVISOR;
+ /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
+ public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
/**
* Sets the default (for any instance) maximum time to wait for a write lock
* (in milliseconds).
@@ -105,7 +101,7 @@ public final class IndexWriterConfig imp
/**
* Returns the default write lock timeout for newly instantiated
* IndexWriterConfigs.
- *
+ *
* @see #setDefaultWriteLockTimeout(long)
*/
public static long getDefaultWriteLockTimeout() {
@@ -127,10 +123,12 @@ public final class IndexWriterConfig imp
private volatile IndexReaderWarmer mergedSegmentWarmer;
private volatile CodecProvider codecProvider;
private volatile MergePolicy mergePolicy;
- private volatile int maxThreadStates;
+ private volatile DocumentsWriterPerThreadPool indexerThreadPool;
private volatile boolean readerPooling;
private volatile int readerTermsIndexDivisor;
-
+ private volatile FlushPolicy flushPolicy;
+ private volatile int perThreadHardLimitMB;
+
private Version matchVersion;
/**
@@ -153,15 +151,16 @@ public final class IndexWriterConfig imp
maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS;
ramBufferSizeMB = DEFAULT_RAM_BUFFER_SIZE_MB;
maxBufferedDocs = DEFAULT_MAX_BUFFERED_DOCS;
- indexingChain = DocumentsWriter.defaultIndexingChain;
+ indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
mergedSegmentWarmer = null;
codecProvider = CodecProvider.getDefault();
mergePolicy = new TieredMergePolicy();
- maxThreadStates = DEFAULT_MAX_THREAD_STATES;
readerPooling = DEFAULT_READER_POOLING;
+ indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
+ perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
-
+
@Override
public Object clone() {
// Shallow clone is the only thing that's possible, since parameters like
@@ -186,7 +185,7 @@ public final class IndexWriterConfig imp
this.openMode = openMode;
return this;
}
-
+
/** Returns the {@link OpenMode} set by {@link #setOpenMode(OpenMode)}. */
public OpenMode getOpenMode() {
return openMode;
@@ -261,7 +260,7 @@ public final class IndexWriterConfig imp
public SimilarityProvider getSimilarityProvider() {
return similarityProvider;
}
-
+
/**
* Expert: set the interval between indexed terms. Large values cause less
* memory to be used by IndexReader, but slow random-access to terms. Small
@@ -281,7 +280,7 @@ public final class IndexWriterConfig imp
* In particular, <code>numUniqueTerms/interval</code> terms are read into
* memory by an IndexReader, and, on average, <code>interval/2</code> terms
* must be scanned for each random term access.
- *
+ *
* @see #DEFAULT_TERM_INDEX_INTERVAL
*
* <p>Takes effect immediately, but only applies to newly
@@ -293,7 +292,7 @@ public final class IndexWriterConfig imp
/**
* Returns the interval between indexed terms.
- *
+ *
* @see #setTermIndexInterval(int)
*/
public int getTermIndexInterval() { // TODO: this should be private to the codec, not settable here
@@ -331,10 +330,10 @@ public final class IndexWriterConfig imp
this.writeLockTimeout = writeLockTimeout;
return this;
}
-
+
/**
* Returns allowed timeout when acquiring the write lock.
- *
+ *
* @see #setWriteLockTimeout(long)
*/
public long getWriteLockTimeout() {
@@ -343,15 +342,16 @@ public final class IndexWriterConfig imp
/**
* Determines the minimal number of delete terms required before the buffered
- * in-memory delete terms are applied and flushed. If there are documents
- * buffered in memory at the time, they are merged and a new segment is
- * created.
-
- * <p>Disabled by default (writer flushes by RAM usage).
+ * in-memory delete terms and queries are applied and flushed.
+ * <p>Disabled by default (writer flushes by RAM usage).</p>
+ * <p>
+ * NOTE: This setting won't trigger a segment flush.
+ * </p>
*
* @throws IllegalArgumentException if maxBufferedDeleteTerms
* is enabled but smaller than 1
* @see #setRAMBufferSizeMB
+ * @see #setFlushPolicy(FlushPolicy)
*
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
@@ -366,9 +366,9 @@ public final class IndexWriterConfig imp
}
/**
- * Returns the number of buffered deleted terms that will trigger a flush if
- * enabled.
- *
+ * Returns the number of buffered deleted terms that will trigger a flush of all
+ * buffered deletes if enabled.
+ *
* @see #setMaxBufferedDeleteTerms(int)
*/
public int getMaxBufferedDeleteTerms() {
@@ -380,45 +380,50 @@ public final class IndexWriterConfig imp
* and deletions before they are flushed to the Directory. Generally for
* faster indexing performance it's best to flush by RAM usage instead of
* document count and use as large a RAM buffer as you can.
- *
* <p>
* When this is set, the writer will flush whenever buffered documents and
* deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
* triggering a flush due to RAM usage. Note that if flushing by document
* count is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ * <p>
+ * The maximum RAM limit is inherently determined by the JVMs available memory.
+ * Yet, an {@link IndexWriter} session can consume a significantly larger amount
+ * of memory than the given RAM limit since this limit is just an indicator when
+ * to flush memory resident documents to the Directory. Flushes are likely happen
+ * concurrently while other threads adding documents to the writer. For application
+ * stability the available memory in the JVM should be significantly larger than
+ * the RAM buffer used for indexing.
* <p>
* <b>NOTE</b>: the account of RAM usage for pending deletions is only
* approximate. Specifically, if you delete by Query, Lucene currently has no
* way to measure the RAM usage of individual Queries so the accounting will
* under-estimate and you should compensate by either calling commit()
* periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
- * to flush by count instead of RAM usage (each buffered delete Query counts
- * as one).
- *
+ * to flush and apply buffered deletes by count instead of RAM usage
+ * (for each buffered delete Query a constant number of bytes is used to estimate
+ * RAM usage). Note that enabling {@link #setMaxBufferedDeleteTerms(int)} will
+ * not trigger any segment flushes.
+ * <p>
+ * <b>NOTE</b>: It's not guaranteed that all memory resident documents are flushed
+ * once this limit is exceeded. Depending on the configured {@link FlushPolicy} only a
+ * subset of the buffered documents are flushed and therefore only parts of the RAM
+ * buffer is released.
* <p>
- * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
- * internal storage, the absolute maximum value for this setting is somewhat
- * less than 2048 MB. The precise limit depends on various factors, such as
- * how large your documents are, how many fields have norms, etc., so it's
- * best to set this value comfortably under 2048.
*
- * <p>
* The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
- *
+ * @see #setFlushPolicy(FlushPolicy)
+ * @see #setRAMPerThreadHardLimitMB(int)
+ *
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
*
* @throws IllegalArgumentException
* if ramBufferSize is enabled but non-positive, or it disables
* ramBufferSize when maxBufferedDocs is already disabled
+ *
*/
public IndexWriterConfig setRAMBufferSizeMB(double ramBufferSizeMB) {
- if (ramBufferSizeMB > 2048.0) {
- throw new IllegalArgumentException("ramBufferSize " + ramBufferSizeMB
- + " is too large; should be comfortably less than 2048");
- }
if (ramBufferSizeMB != DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0)
throw new IllegalArgumentException(
"ramBufferSize should be > 0.0 MB when enabled");
@@ -438,22 +443,22 @@ public final class IndexWriterConfig imp
* Determines the minimal number of documents required before the buffered
* in-memory documents are flushed as a new Segment. Large values generally
* give faster indexing.
- *
+ *
* <p>
* When this is set, the writer will flush every maxBufferedDocs added
* documents. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent triggering a
* flush due to number of buffered documents. Note that if flushing by RAM
* usage is also enabled, then the flush will be triggered by whichever comes
* first.
- *
+ *
* <p>
* Disabled by default (writer flushes by RAM usage).
- *
+ *
* <p>Takes effect immediately, but only the next time a
* document is added, updated or deleted.
*
* @see #setRAMBufferSizeMB(double)
- *
+ * @see #setFlushPolicy(FlushPolicy)
* @throws IllegalArgumentException
* if maxBufferedDocs is enabled but smaller than 2, or it disables
* maxBufferedDocs when ramBufferSize is already disabled
@@ -473,7 +478,7 @@ public final class IndexWriterConfig imp
/**
* Returns the number of buffered added documents that will trigger a flush if
* enabled.
- *
+ *
* @see #setMaxBufferedDocs(int)
*/
public int getMaxBufferedDocs() {
@@ -519,32 +524,43 @@ public final class IndexWriterConfig imp
return codecProvider;
}
-
+
/**
* Returns the current MergePolicy in use by this writer.
- *
+ *
* @see #setMergePolicy(MergePolicy)
*/
public MergePolicy getMergePolicy() {
return mergePolicy;
}
- /**
- * Sets the max number of simultaneous threads that may be indexing documents
- * at once in IndexWriter. Values < 1 are invalid and if passed
- * <code>maxThreadStates</code> will be set to
- * {@link #DEFAULT_MAX_THREAD_STATES}.
- *
- * <p>Only takes effect when IndexWriter is first created. */
- public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
- this.maxThreadStates = maxThreadStates < 1 ? DEFAULT_MAX_THREAD_STATES : maxThreadStates;
+ /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
+ * IndexWriter to assign thread-states to incoming indexing threads. If no
+ * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
+ * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
+ * thread-states set to {@value DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
+ * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
+ * </p>
+ * <p>
+ * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
+ * other {@link IndexWriter} instances once it has been initialized / associated with an
+ * {@link IndexWriter}.
+ * </p>
+ * <p>
+ * NOTE: This only takes effect when IndexWriter is first created.</p>*/
+ public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
+ if(threadPool == null) {
+ throw new IllegalArgumentException("DocumentsWriterPerThreadPool must not be nul");
+ }
+ this.indexerThreadPool = threadPool;
return this;
}
- /** Returns the max number of simultaneous threads that
- * may be indexing documents at once in IndexWriter. */
- public int getMaxThreadStates() {
- return maxThreadStates;
+ /** Returns the configured {@link DocumentsWriterPerThreadPool} instance.
+ * @see #setIndexerThreadPool(DocumentsWriterPerThreadPool)
+ * @return the configured {@link DocumentsWriterPerThreadPool} instance.*/
+ public DocumentsWriterPerThreadPool getIndexerThreadPool() {
+ return this.indexerThreadPool;
}
/** By default, IndexWriter does not pool the
@@ -572,10 +588,10 @@ public final class IndexWriterConfig imp
*
* <p>Only takes effect when IndexWriter is first created. */
IndexWriterConfig setIndexingChain(IndexingChain indexingChain) {
- this.indexingChain = indexingChain == null ? DocumentsWriter.defaultIndexingChain : indexingChain;
+ this.indexingChain = indexingChain == null ? DocumentsWriterPerThread.defaultIndexingChain : indexingChain;
return this;
}
-
+
/** Returns the indexing chain set on {@link #setIndexingChain(IndexingChain)}. */
IndexingChain getIndexingChain() {
return indexingChain;
@@ -604,6 +620,53 @@ public final class IndexWriterConfig imp
return readerTermsIndexDivisor;
}
+ /**
+ * Expert: Controls when segments are flushed to disk during indexing.
+ * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
+ * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
+ * @see #setMaxBufferedDeleteTerms(int)
+ * @see #setMaxBufferedDocs(int)
+ * @see #setRAMBufferSizeMB(double)
+ */
+ public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+ this.flushPolicy = flushPolicy;
+ return this;
+ }
+
+ /**
+ * Expert: Sets the maximum memory consumption per thread triggering a forced
+ * flush if exceeded. A {@link DocumentsWriterPerThread} is forcefully flushed
+ * once it exceeds this limit even if the {@link #getRAMBufferSizeMB()} has
+ * not been exceeded. This is a safety limit to prevent a
+ * {@link DocumentsWriterPerThread} from address space exhaustion due to its
+ * internal 32 bit signed integer based memory addressing.
+ * The given value must be less that 2GB (2048MB)
+ *
+ * @see #DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB
+ */
+ public IndexWriterConfig setRAMPerThreadHardLimitMB(int perThreadHardLimitMB) {
+ if (perThreadHardLimitMB <= 0 || perThreadHardLimitMB >= 2048) {
+ throw new IllegalArgumentException("PerThreadHardLimit must be greater than 0 and less than 2048MB");
+ }
+ this.perThreadHardLimitMB = perThreadHardLimitMB;
+ return this;
+ }
+
+ /**
+ * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
+ * consume until forcefully flushed.
+ * @see #setRAMPerThreadHardLimitMB(int)
+ */
+ public int getRAMPerThreadHardLimitMB() {
+ return perThreadHardLimitMB;
+ }
+ /**
+ * @see #setFlushPolicy(FlushPolicy)
+ */
+ public FlushPolicy getFlushPolicy() {
+ return flushPolicy;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -623,9 +686,13 @@ public final class IndexWriterConfig imp
sb.append("mergedSegmentWarmer=").append(mergedSegmentWarmer).append("\n");
sb.append("codecProvider=").append(codecProvider).append("\n");
sb.append("mergePolicy=").append(mergePolicy).append("\n");
- sb.append("maxThreadStates=").append(maxThreadStates).append("\n");
+ sb.append("indexerThreadPool=").append(indexerThreadPool).append("\n");
sb.append("readerPooling=").append(readerPooling).append("\n");
sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+ sb.append("flushPolicy=").append(flushPolicy).append("\n");
+ sb.append("perThreadHardLimitMB=").append(perThreadHardLimitMB).append("\n");
+
return sb.toString();
}
+
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java Sun May 1 22:38:33 2011
@@ -1,5 +1,7 @@
package org.apache.lucene.index;
+import java.util.Arrays;
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -22,24 +24,24 @@ final class IntBlockPool {
public int[][] buffers = new int[10][];
int bufferUpto = -1; // Which buffer we are upto
- public int intUpto = DocumentsWriter.INT_BLOCK_SIZE; // Where we are in head buffer
+ public int intUpto = DocumentsWriterPerThread.INT_BLOCK_SIZE; // Where we are in head buffer
public int[] buffer; // Current head buffer
- public int intOffset = -DocumentsWriter.INT_BLOCK_SIZE; // Current head offset
+ public int intOffset = -DocumentsWriterPerThread.INT_BLOCK_SIZE; // Current head offset
- final private DocumentsWriter docWriter;
+ final private DocumentsWriterPerThread docWriter;
- public IntBlockPool(DocumentsWriter docWriter) {
+ public IntBlockPool(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
}
public void reset() {
if (bufferUpto != -1) {
- if (bufferUpto > 0)
- // Recycle all but the first buffer
- docWriter.recycleIntBlocks(buffers, 1, 1+bufferUpto);
-
// Reuse first buffer
+ if (bufferUpto > 0) {
+ docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+ Arrays.fill(buffers, 1, bufferUpto, null);
+ }
bufferUpto = 0;
intUpto = 0;
intOffset = 0;
@@ -57,7 +59,7 @@ final class IntBlockPool {
bufferUpto++;
intUpto = 0;
- intOffset += DocumentsWriter.INT_BLOCK_SIZE;
+ intOffset += DocumentsWriterPerThread.INT_BLOCK_SIZE;
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java Sun May 1 22:38:33 2011
@@ -17,20 +17,22 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.Collection;
-import java.util.Map;
import java.io.IOException;
+import java.util.Map;
abstract class InvertedDocConsumer {
- /** Add a new thread */
- abstract InvertedDocConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
-
/** Abort (called after hitting AbortException) */
abstract void abort();
/** Flush a new segment */
- abstract void flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, InvertedDocConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
+
+ abstract InvertedDocConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+
+ abstract void startDocument() throws IOException;
+
+ abstract void finishDocument() throws IOException;
/** Attempt to free RAM, returning true if any RAM was
* freed */
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java Sun May 1 22:38:33 2011
@@ -17,12 +17,13 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.util.Collection;
-import java.util.Map;
import java.io.IOException;
+import java.util.Map;
abstract class InvertedDocEndConsumer {
- abstract InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
- abstract void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
abstract void abort();
+ abstract InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+ abstract void startDocument() throws IOException;
+ abstract void finishDocument() throws IOException;
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java Sun May 1 22:38:33 2011
@@ -19,11 +19,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.HashMap;
import java.util.Map;
-import java.util.List;
-import java.util.ArrayList;
import org.apache.lucene.store.IndexOutput;
@@ -36,10 +32,6 @@ import org.apache.lucene.store.IndexOutp
final class NormsWriter extends InvertedDocEndConsumer {
- @Override
- public InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread) {
- return new NormsWriterPerThread(docInverterPerThread, this);
- }
@Override
public void abort() {}
@@ -50,40 +42,11 @@ final class NormsWriter extends Inverted
/** Produce _X.nrm if any document had a field with norms
* not disabled */
@Override
- public void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
- final Map<FieldInfo,List<NormsWriterPerField>> byField = new HashMap<FieldInfo,List<NormsWriterPerField>>();
-
+ public void flush(Map<FieldInfo,InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
if (!state.fieldInfos.hasNorms()) {
return;
}
- // Typically, each thread will have encountered the same
- // field. So first we collate by field, ie, all
- // per-thread field instances that correspond to the
- // same FieldInfo
- for (final Map.Entry<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> entry : threadsAndFields.entrySet()) {
- final Collection<InvertedDocEndConsumerPerField> fields = entry.getValue();
- final Iterator<InvertedDocEndConsumerPerField> fieldsIt = fields.iterator();
-
- while (fieldsIt.hasNext()) {
- final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next();
-
- if (perField.upto > 0) {
- // It has some norms
- List<NormsWriterPerField> l = byField.get(perField.fieldInfo);
- if (l == null) {
- l = new ArrayList<NormsWriterPerField>();
- byField.put(perField.fieldInfo, l);
- }
- l.add(perField);
- } else
- // Remove this field since we haven't seen it
- // since the previous flush
- fieldsIt.remove();
- }
- }
-
final String normsFileName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.NORMS_EXTENSION);
IndexOutput normsOut = state.directory.createOutput(normsFileName);
@@ -93,60 +56,25 @@ final class NormsWriter extends Inverted
int normCount = 0;
for (FieldInfo fi : state.fieldInfos) {
- final List<NormsWriterPerField> toMerge = byField.get(fi);
+ final NormsWriterPerField toWrite = (NormsWriterPerField) fieldsToFlush.get(fi);
int upto = 0;
- if (toMerge != null) {
-
- final int numFields = toMerge.size();
-
+ if (toWrite != null && toWrite.upto > 0) {
normCount++;
- final NormsWriterPerField[] fields = new NormsWriterPerField[numFields];
- int[] uptos = new int[numFields];
-
- for(int j=0;j<numFields;j++)
- fields[j] = toMerge.get(j);
-
- int numLeft = numFields;
-
- while(numLeft > 0) {
-
- assert uptos[0] < fields[0].docIDs.length : " uptos[0]=" + uptos[0] + " len=" + (fields[0].docIDs.length);
-
- int minLoc = 0;
- int minDocID = fields[0].docIDs[uptos[0]];
-
- for(int j=1;j<numLeft;j++) {
- final int docID = fields[j].docIDs[uptos[j]];
- if (docID < minDocID) {
- minDocID = docID;
- minLoc = j;
- }
- }
-
- assert minDocID < state.numDocs;
-
- // Fill hole
- for(;upto<minDocID;upto++)
+ int docID = 0;
+ for (; docID < state.numDocs; docID++) {
+ if (upto < toWrite.upto && toWrite.docIDs[upto] == docID) {
+ normsOut.writeByte(toWrite.norms[upto]);
+ upto++;
+ } else {
normsOut.writeByte((byte) 0);
-
- normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]);
- (uptos[minLoc])++;
- upto++;
-
- if (uptos[minLoc] == fields[minLoc].upto) {
- fields[minLoc].reset();
- if (minLoc != numLeft-1) {
- fields[minLoc] = fields[numLeft-1];
- uptos[minLoc] = uptos[numLeft-1];
- }
- numLeft--;
}
}
-
- // Fill final hole with defaultNorm
- for(;upto<state.numDocs;upto++)
- normsOut.writeByte((byte) 0);
+
+ // we should have consumed every norm
+ assert upto == toWrite.upto;
+
+ toWrite.reset();
} else if (fi.isIndexed && !fi.omitNorms) {
normCount++;
// Fill entire field with default norm:
@@ -161,4 +89,16 @@ final class NormsWriter extends Inverted
normsOut.close();
}
}
+
+ @Override
+ void finishDocument() throws IOException {}
+
+ @Override
+ void startDocument() throws IOException {}
+
+ @Override
+ InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField,
+ FieldInfo fieldInfo) {
+ return new NormsWriterPerField(docInverterPerField, fieldInfo);
+ }
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java Sun May 1 22:38:33 2011
@@ -27,9 +27,8 @@ import org.apache.lucene.util.ArrayUtil;
final class NormsWriterPerField extends InvertedDocEndConsumerPerField implements Comparable<NormsWriterPerField> {
- final NormsWriterPerThread perThread;
final FieldInfo fieldInfo;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final Similarity similarity;
// Holds all docID/norm pairs we've seen
@@ -46,10 +45,9 @@ final class NormsWriterPerField extends
upto = 0;
}
- public NormsWriterPerField(final DocInverterPerField docInverterPerField, final NormsWriterPerThread perThread, final FieldInfo fieldInfo) {
- this.perThread = perThread;
+ public NormsWriterPerField(final DocInverterPerField docInverterPerField, final FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
- docState = perThread.docState;
+ docState = docInverterPerField.docState;
fieldState = docInverterPerField.fieldState;
similarity = docState.similarityProvider.get(fieldInfo.name);
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Sun May 1 22:38:33 2011
@@ -37,14 +37,14 @@ import org.apache.lucene.util.Constants;
/**
* Information about a segment such as it's name, directory, and files related
* to the segment.
- *
+ *
* @lucene.experimental
*/
public final class SegmentInfo {
static final int NO = -1; // e.g. no norms; no deletes;
static final int YES = 1; // e.g. have norms; have deletes;
- static final int WITHOUT_GEN = 0; // a file name that has no GEN in it.
+ static final int WITHOUT_GEN = 0; // a file name that has no GEN in it.
public String name; // unique name in dir
public int docCount; // number of docs in seg
@@ -56,7 +56,7 @@ public final class SegmentInfo {
* - YES or higher if there are deletes at generation N
*/
private long delGen;
-
+
/*
* Current generation of each field's norm file. If this array is null,
* means no separate norms. If this array is not null, its values mean:
@@ -65,7 +65,7 @@ public final class SegmentInfo {
*/
private Map<Integer,Long> normGen;
- private boolean isCompoundFile;
+ private boolean isCompoundFile;
private volatile List<String> files; // cached list of files that this segment uses
// in the Directory
@@ -73,10 +73,13 @@ public final class SegmentInfo {
private volatile long sizeInBytesNoStore = -1; // total byte size of all but the store files (computed on demand)
private volatile long sizeInBytesWithStore = -1; // total byte size of all of our files (computed on demand)
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private int docStoreOffset; // if this segment shares stored fields & vectors, this
// offset is where in that file this segment's docs begin
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private String docStoreSegment; // name used to derive fields/vectors file we share with
// other segments
+ //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
private boolean docStoreIsCompoundFile; // whether doc store files are stored in compound file (*.cfx)
private int delCount; // How many deleted docs in this segment
@@ -91,9 +94,9 @@ public final class SegmentInfo {
private Map<String,String> diagnostics;
- // Tracks the Lucene version this segment was created with, since 3.1. Null
+ // Tracks the Lucene version this segment was created with, since 3.1. Null
// indicates an older than 3.0 index, and it's used to detect a too old index.
- // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
+ // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
// specific versions afterwards ("3.0", "3.1" etc.).
// see Constants.LUCENE_MAIN_VERSION.
private String version;
@@ -101,7 +104,7 @@ public final class SegmentInfo {
// NOTE: only used in-RAM by IW to track buffered deletes;
// this is never written to/read from the Directory
private long bufferedDeletesGen;
-
+
public SegmentInfo(String name, int docCount, Directory dir, boolean isCompoundFile,
boolean hasProx, SegmentCodecs segmentCodecs, boolean hasVectors, FieldInfos fieldInfos) {
this.name = name;
@@ -182,11 +185,13 @@ public final class SegmentInfo {
docStoreSegment = name;
docStoreIsCompoundFile = false;
}
+
if (format > DefaultSegmentInfosWriter.FORMAT_4_0) {
// pre-4.0 indexes write a byte if there is a single norms file
byte b = input.readByte();
assert 1 == b;
}
+
int numNormGen = input.readInt();
if (numNormGen == NO) {
normGen = null;
@@ -207,7 +212,7 @@ public final class SegmentInfo {
assert delCount <= docCount;
hasProx = input.readByte() == YES;
-
+
// System.out.println(Thread.currentThread().getName() + ": si.read hasProx=" + hasProx + " seg=" + name);
if (format <= DefaultSegmentInfosWriter.FORMAT_4_0) {
segmentCodecs = new SegmentCodecs(codecs, input);
@@ -217,7 +222,7 @@ public final class SegmentInfo {
segmentCodecs = new SegmentCodecs(codecs, new Codec[] { codecs.lookup("PreFlex")});
}
diagnostics = input.readStringStringMap();
-
+
if (format <= DefaultSegmentInfosWriter.FORMAT_HAS_VECTORS) {
hasVectors = input.readByte() == 1;
} else {
@@ -366,7 +371,7 @@ public final class SegmentInfo {
// against this segment
return null;
} else {
- return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
+ return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
}
}
@@ -432,7 +437,7 @@ public final class SegmentInfo {
if (hasSeparateNorms(number)) {
return IndexFileNames.fileNameFromGeneration(name, "s" + number, normGen.get(number));
} else {
- // single file for all norms
+ // single file for all norms
return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.NORMS_EXTENSION, WITHOUT_GEN);
}
}
@@ -465,39 +470,74 @@ public final class SegmentInfo {
assert delCount <= docCount;
}
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public int getDocStoreOffset() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreOffset;
}
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public boolean getDocStoreIsCompoundFile() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreIsCompoundFile;
}
-
- void setDocStoreIsCompoundFile(boolean v) {
- docStoreIsCompoundFile = v;
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
+ public void setDocStoreIsCompoundFile(boolean docStoreIsCompoundFile) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ this.docStoreIsCompoundFile = docStoreIsCompoundFile;
clearFilesCache();
}
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
+ void setDocStore(int offset, String segment, boolean isCompoundFile) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ docStoreOffset = offset;
+ docStoreSegment = segment;
+ docStoreIsCompoundFile = isCompoundFile;
+ clearFilesCache();
+ }
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
public String getDocStoreSegment() {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
return docStoreSegment;
}
-
- public void setDocStoreSegment(String segment) {
- docStoreSegment = segment;
- }
-
+
+ /**
+ * @deprecated shared doc stores are not supported in >= 4.0
+ */
+ @Deprecated
void setDocStoreOffset(int offset) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
docStoreOffset = offset;
clearFilesCache();
}
- void setDocStore(int offset, String segment, boolean isCompoundFile) {
- docStoreOffset = offset;
- docStoreSegment = segment;
- docStoreIsCompoundFile = isCompoundFile;
- clearFilesCache();
+ /**
+ * @deprecated shared doc stores are not supported in 4.0
+ */
+ @Deprecated
+ public void setDocStoreSegment(String docStoreSegment) {
+ // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+ this.docStoreSegment = docStoreSegment;
}
-
+
/** Save this segment's info. */
public void write(IndexOutput output)
throws IOException {
@@ -507,12 +547,14 @@ public final class SegmentInfo {
output.writeString(name);
output.writeInt(docCount);
output.writeLong(delGen);
+
output.writeInt(docStoreOffset);
if (docStoreOffset != -1) {
output.writeString(docStoreSegment);
output.writeByte((byte) (docStoreIsCompoundFile ? 1:0));
}
+
if (normGen == null) {
output.writeInt(NO);
} else {
@@ -522,7 +564,7 @@ public final class SegmentInfo {
output.writeLong(entry.getValue());
}
}
-
+
output.writeByte((byte) (isCompoundFile ? YES : NO));
output.writeInt(delCount);
output.writeByte((byte) (hasProx ? 1:0));
@@ -570,9 +612,9 @@ public final class SegmentInfo {
// Already cached:
return files;
}
-
+
Set<String> fileSet = new HashSet<String>();
-
+
boolean useCompoundFile = getUseCompoundFile();
if (useCompoundFile) {
@@ -606,7 +648,7 @@ public final class SegmentInfo {
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_INDEX_EXTENSION));
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_DOCUMENTS_EXTENSION));
fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_FIELDS_EXTENSION));
- }
+ }
}
String delFileName = IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
@@ -644,7 +686,7 @@ public final class SegmentInfo {
}
/** Used for debugging. Format may suddenly change.
- *
+ *
* <p>Current format looks like
* <code>_a(3.1):c45/4->_1</code>, which means the segment's
* name is <code>_a</code>; it was created with Lucene 3.1 (or
@@ -674,7 +716,7 @@ public final class SegmentInfo {
if (delCount != 0) {
s.append('/').append(delCount);
}
-
+
if (docStoreOffset != -1) {
s.append("->").append(docStoreSegment);
if (docStoreIsCompoundFile) {
@@ -714,13 +756,13 @@ public final class SegmentInfo {
* <b>NOTE:</b> this method is used for internal purposes only - you should
* not modify the version of a SegmentInfo, or it may result in unexpected
* exceptions thrown when you attempt to open the index.
- *
+ *
* @lucene.internal
*/
public void setVersion(String version) {
this.version = version;
}
-
+
/** Returns the version of the code which wrote the segment. */
public String getVersion() {
return version;