You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC
svn commit: r1098427 [2/5] - in /lucene/dev/trunk: ./ lucene/
lucene/src/java/org/apache/lucene/index/
lucene/src/test-framework/org/apache/lucene/search/
lucene/src/test-framework/org/apache/lucene/store/
lucene/src/test-framework/org/apache/lucene/ut...
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Sun May 1 22:38:33 2011
@@ -19,36 +19,27 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.io.PrintStream;
-import java.text.NumberFormat;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMFile;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.RamUsageEstimator;
-import org.apache.lucene.util.RecyclingByteBlockAllocator;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
/**
* This class accepts multiple added documents and directly
- * writes a single segment file. It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
+ * writes segment files.
*
* Each added document is passed to the {@link DocConsumer},
* which in turn processes the document and interacts with
@@ -111,266 +102,117 @@ import static org.apache.lucene.util.Byt
*/
final class DocumentsWriter {
- final AtomicLong bytesUsed = new AtomicLong(0);
- IndexWriter writer;
Directory directory;
- String segment; // Current segment we are working on
-
- private int nextDocID; // Next docID to be added
- private int numDocs; // # of docs added, but not yet flushed
-
- // Max # ThreadState instances; if there are more threads
- // than this they share ThreadStates
- private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
- private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
-
- boolean bufferIsFull; // True when it's time to write segment
- private boolean aborting; // True if an abort is pending
+ private volatile boolean closed;
PrintStream infoStream;
SimilarityProvider similarityProvider;
- // max # simultaneous threads; if there are more than
- // this, they wait for others to finish first
- private final int maxThreadStates;
-
- // TODO: cutover to BytesRefHash
- // Deletes for our still-in-RAM (to be flushed next) segment
- private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
-
- static class DocState {
- DocumentsWriter docWriter;
- Analyzer analyzer;
- PrintStream infoStream;
- SimilarityProvider similarityProvider;
- int docID;
- Document doc;
- String maxTermPrefix;
-
- // Only called by asserts
- public boolean testPoint(String name) {
- return docWriter.writer.testPoint(name);
- }
-
- public void clear() {
- // don't hold onto doc nor analyzer, in case it is
- // largish:
- doc = null;
- analyzer = null;
- }
- }
-
- /** Consumer returns this on each doc. This holds any
- * state that must be flushed synchronized "in docID
- * order". We gather these and flush them in order. */
- abstract static class DocWriter {
- DocWriter next;
- int docID;
- abstract void finish() throws IOException;
- abstract void abort();
- abstract long sizeInBytes();
+ List<String> newFiles;
- void setNext(DocWriter next) {
- this.next = next;
- }
- }
+ final IndexWriter indexWriter;
- /**
- * Create and return a new DocWriterBuffer.
- */
- PerDocBuffer newPerDocBuffer() {
- return new PerDocBuffer();
- }
-
- /**
- * RAMFile buffer for DocWriters.
- */
- class PerDocBuffer extends RAMFile {
-
- /**
- * Allocate bytes used from shared pool.
- */
- @Override
- protected byte[] newBuffer(int size) {
- assert size == PER_DOC_BLOCK_SIZE;
- return perDocAllocator.getByteBlock();
- }
-
- /**
- * Recycle the bytes used.
- */
- synchronized void recycle() {
- if (buffers.size() > 0) {
- setLength(0);
-
- // Recycle the blocks
- perDocAllocator.recycleByteBlocks(buffers);
- buffers.clear();
- sizeInBytes = 0;
-
- assert numBuffers() == 0;
- }
- }
- }
-
- /**
- * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
- * which returns the DocConsumer that the DocumentsWriter calls to process the
- * documents.
- */
- abstract static class IndexingChain {
- abstract DocConsumer getChain(DocumentsWriter documentsWriter);
- }
-
- static final IndexingChain defaultIndexingChain = new IndexingChain() {
+ private AtomicInteger numDocsInRAM = new AtomicInteger(0);
- @Override
- DocConsumer getChain(DocumentsWriter documentsWriter) {
- /*
- This is the current indexing chain:
-
- DocConsumer / DocConsumerPerThread
- --> code: DocFieldProcessor / DocFieldProcessorPerThread
- --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
- --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
- --> code: DocInverter / DocInverterPerThread / DocInverterPerField
- --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
- --> code: TermsHash / TermsHashPerThread / TermsHashPerField
- --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
- --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
- --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
- --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
- --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
- --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
- */
-
- // Build up indexing chain:
-
- final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
- final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
- /*
- * nesting TermsHash instances here to allow the secondary (TermVectors) share the interned postings
- * via a shared ByteBlockPool. See TermsHashPerField for details.
- */
- final TermsHash termVectorsTermHash = new TermsHash(documentsWriter, false, termVectorsWriter, null);
- final InvertedDocConsumer termsHash = new TermsHash(documentsWriter, true, freqProxWriter, termVectorsTermHash);
- final NormsWriter normsWriter = new NormsWriter();
- final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
- return new DocFieldProcessor(documentsWriter, docInverter);
- }
- };
-
- final DocConsumer consumer;
-
- // How much RAM we can use before flushing. This is 0 if
- // we are flushing by doc count instead.
-
- private final IndexWriterConfig config;
+ // TODO: cut over to BytesRefHash in BufferedDeletes
+ volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
+ private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
- private boolean closed;
- private FieldInfos fieldInfos;
+ private Collection<String> abortedFiles; // List of files that were written before last abort()
- private final BufferedDeletesStream bufferedDeletesStream;
- private final IndexWriter.FlushControl flushControl;
+ final IndexingChain chain;
- DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, IndexingChain indexingChain, FieldInfos fieldInfos,
+ final DocumentsWriterPerThreadPool perThreadPool;
+ final FlushPolicy flushPolicy;
+ final DocumentsWriterFlushControl flushControl;
+ final Healthiness healthiness;
+ DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
BufferedDeletesStream bufferedDeletesStream) throws IOException {
this.directory = directory;
- this.writer = writer;
+ this.indexWriter = writer;
this.similarityProvider = config.getSimilarityProvider();
- this.maxThreadStates = config.getMaxThreadStates();
- this.fieldInfos = fieldInfos;
- this.bufferedDeletesStream = bufferedDeletesStream;
- flushControl = writer.flushControl;
- consumer = config.getIndexingChain().getChain(this);
- this.config = config;
- }
-
- // Buffer a specific docID for deletion. Currently only
- // used when we hit a exception when adding a document
- synchronized void deleteDocID(int docIDUpto) {
- pendingDeletes.addDocID(docIDUpto);
- // NOTE: we do not trigger flush here. This is
- // potentially a RAM leak, if you have an app that tries
- // to add docs but every single doc always hits a
- // non-aborting exception. Allowing a flush here gets
- // very messy because we are only invoked when handling
- // exceptions so to do this properly, while handling an
- // exception we'd have to go off and flush new deletes
- // which is risky (likely would hit some other
- // confounding exception).
- }
-
- boolean deleteQueries(Query... queries) {
- final boolean doFlush = flushControl.waitUpdate(0, queries.length);
- synchronized(this) {
- for (Query query : queries) {
- pendingDeletes.addQuery(query, numDocs);
- }
- }
- return doFlush;
- }
-
- boolean deleteQuery(Query query) {
- final boolean doFlush = flushControl.waitUpdate(0, 1);
- synchronized(this) {
- pendingDeletes.addQuery(query, numDocs);
+ this.perThreadPool = config.getIndexerThreadPool();
+ this.chain = config.getIndexingChain();
+ this.perThreadPool.initialize(this, globalFieldNumbers, config);
+ final FlushPolicy configuredPolicy = config.getFlushPolicy();
+ if (configuredPolicy == null) {
+ flushPolicy = new FlushByRamOrCountsPolicy();
+ } else {
+ flushPolicy = configuredPolicy;
}
- return doFlush;
+ flushPolicy.init(this);
+
+ healthiness = new Healthiness();
+ final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
+ flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
}
-
- boolean deleteTerms(Term... terms) {
- final boolean doFlush = flushControl.waitUpdate(0, terms.length);
- synchronized(this) {
- for (Term term : terms) {
- pendingDeletes.addTerm(term, numDocs);
- }
+
+ synchronized void deleteQueries(final Query... queries) throws IOException {
+ deleteQueue.addDelete(queries);
+ flushControl.doOnDelete();
+ if (flushControl.doApplyAllDeletes()) {
+ applyAllDeletes(deleteQueue);
}
- return doFlush;
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
- boolean deleteTerm(Term term, boolean skipWait) {
- final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
- synchronized(this) {
- pendingDeletes.addTerm(term, numDocs);
+ synchronized void deleteTerms(final Term... terms) throws IOException {
+ final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+ deleteQueue.addDelete(terms);
+ flushControl.doOnDelete();
+ if (flushControl.doApplyAllDeletes()) {
+ applyAllDeletes(deleteQueue);
}
- return doFlush;
}
- /** If non-null, various details of indexing are printed
- * here. */
+ DocumentsWriterDeleteQueue currentDeleteSession() {
+ return deleteQueue;
+ }
+
+ private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+ if (deleteQueue != null) {
+ synchronized (ticketQueue) {
+ // Freeze and insert the delete flush ticket in the queue
+ ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
+ applyFlushTickets();
+ }
+ }
+ indexWriter.applyAllDeletes();
+ indexWriter.flushCount.incrementAndGet();
+ }
+
synchronized void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].docState.infoStream = infoStream;
+ final Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
+ while (it.hasNext()) {
+ it.next().perThread.docState.infoStream = infoStream;
}
}
- /** Get current segment name we are writing. */
- synchronized String getSegment() {
- return segment;
+ /** Returns how many docs are currently buffered in RAM. */
+ int getNumDocs() {
+ return numDocsInRAM.get();
}
- /** Returns how many docs are currently buffered in RAM. */
- synchronized int getNumDocs() {
- return numDocs;
+ Collection<String> abortedFiles() {
+ return abortedFiles;
}
- void message(String message) {
+ // returns boolean for asserts
+ boolean message(String message) {
if (infoStream != null) {
- writer.message("DW: " + message);
+ indexWriter.message("DW: " + message);
}
+ return true;
}
- synchronized void setAborting() {
- if (infoStream != null) {
- message("setAborting");
+ private void ensureOpen() throws AlreadyClosedException {
+ if (closed) {
+ throw new AlreadyClosedException("this IndexWriter is closed");
}
- aborting = true;
}
/** Called if we hit an exception at a bad time (when
@@ -378,816 +220,335 @@ final class DocumentsWriter {
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
- if (infoStream != null) {
- message("docWriter: abort");
- }
-
boolean success = false;
- try {
-
- // Forcefully remove waiting ThreadStates from line
- waitQueue.abort();
-
- // Wait for all other threads to finish with
- // DocumentsWriter:
- waitIdle();
+ synchronized (this) {
+ deleteQueue.clear();
+ }
+ try {
if (infoStream != null) {
- message("docWriter: abort waitIdle done");
+ message("docWriter: abort");
}
- assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
-
- waitQueue.waitingBytes = 0;
-
- pendingDeletes.clear();
+ final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
- for (DocumentsWriterThreadState threadState : threadStates)
+ while (threadsIterator.hasNext()) {
+ ThreadState perThread = threadsIterator.next();
+ perThread.lock();
try {
- threadState.consumer.abort();
- } catch (Throwable t) {
+ if (perThread.isActive()) { // we might be closed
+ perThread.perThread.abort();
+ perThread.perThread.checkAndResetHasAborted();
+ } else {
+ assert closed;
+ }
+ } finally {
+ perThread.unlock();
}
-
- try {
- consumer.abort();
- } catch (Throwable t) {
}
- // Reset all postings data
- doAfterFlush();
success = true;
} finally {
- aborting = false;
- notifyAll();
if (infoStream != null) {
- message("docWriter: done abort; success=" + success);
+ message("docWriter: done abort; abortedFiles=" + abortedFiles + " success=" + success);
}
}
}
- /** Reset after a flush */
- private void doAfterFlush() throws IOException {
- // All ThreadStates should be idle when we are called
- assert allThreadsIdle();
- for (DocumentsWriterThreadState threadState : threadStates) {
- threadState.consumer.doAfterFlush();
- }
-
- threadBindings.clear();
- waitQueue.reset();
- segment = null;
- fieldInfos = new FieldInfos(fieldInfos);
- numDocs = 0;
- nextDocID = 0;
- bufferIsFull = false;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].doAfterFlush();
- }
+ boolean anyChanges() {
+ return numDocsInRAM.get() != 0 || anyDeletions();
}
- private synchronized boolean allThreadsIdle() {
- for(int i=0;i<threadStates.length;i++) {
- if (!threadStates[i].isIdle) {
- return false;
- }
- }
- return true;
+ public int getBufferedDeleteTermsSize() {
+ return deleteQueue.getBufferedDeleteTermsSize();
}
- synchronized boolean anyChanges() {
- return numDocs != 0 || pendingDeletes.any();
- }
-
- // for testing
- public BufferedDeletes getPendingDeletes() {
- return pendingDeletes;
- }
-
- private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
- // Lock order: DW -> BD
- final long delGen = bufferedDeletesStream.getNextGen();
- if (pendingDeletes.any()) {
- if (segmentInfos.size() > 0 || newSegment != null) {
- final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
- if (infoStream != null) {
- message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
- }
- bufferedDeletesStream.push(packet);
- if (infoStream != null) {
- message("flush: delGen=" + packet.gen);
- }
- if (newSegment != null) {
- newSegment.setBufferedDeletesGen(packet.gen);
- }
- } else {
- if (infoStream != null) {
- message("flush: drop buffered deletes: no segments");
- }
- // We can safely discard these deletes: since
- // there are no segments, the deletions cannot
- // affect anything.
- }
- pendingDeletes.clear();
- } else if (newSegment != null) {
- newSegment.setBufferedDeletesGen(delGen);
- }
+ //for testing
+ public int getNumBufferedDeleteTerms() {
+ return deleteQueue.numGlobalTermDeletes();
}
public boolean anyDeletions() {
- return pendingDeletes.any();
+ return deleteQueue.anyChanges();
}
- /** Flush all pending docs to a new segment */
- // Lock order: IW -> DW
- synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
-
- final long startTime = System.currentTimeMillis();
-
- // We change writer's segmentInfos:
- assert Thread.holdsLock(writer);
-
- waitIdle();
+ void close() {
+ closed = true;
+ flushControl.setClosed();
+ }
- if (numDocs == 0) {
- // nothing to do!
- if (infoStream != null) {
- message("flush: no docs; skipping");
- }
- // Lock order: IW -> DW -> BD
- pushDeletes(null, segmentInfos);
- return null;
- }
+ boolean updateDocument(final Document doc, final Analyzer analyzer,
+ final Term delTerm) throws CorruptIndexException, IOException {
+ ensureOpen();
+ boolean maybeMerge = false;
+ final boolean isUpdate = delTerm != null;
+ if (healthiness.anyStalledThreads()) {
- if (aborting) {
+ // Help out flushing any pending DWPTs so we can un-stall:
if (infoStream != null) {
- message("flush: skip because aborting is set");
+ message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
}
- return null;
- }
-
- boolean success = false;
-
- SegmentInfo newSegment;
-
- try {
- assert nextDocID == numDocs;
- assert waitQueue.numWaiting == 0;
- assert waitQueue.waitingBytes == 0;
- if (infoStream != null) {
- message("flush postings as segment " + segment + " numDocs=" + numDocs);
- }
-
- final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
- numDocs, writer.getConfig().getTermIndexInterval(),
- fieldInfos.buildSegmentCodecs(true),
- pendingDeletes);
- // Apply delete-by-docID now (delete-byDocID only
- // happens when an exception is hit processing that
- // doc, eg if analyzer has some problem w/ the text):
- if (pendingDeletes.docIDs.size() > 0) {
- flushState.deletedDocs = new BitVector(numDocs);
- for(int delDocID : pendingDeletes.docIDs) {
- flushState.deletedDocs.set(delDocID);
+ // Try pick up pending threads here if possible
+ DocumentsWriterPerThread flushingDWPT;
+ while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+ // Don't push the delete here since the update could fail!
+ maybeMerge = doFlush(flushingDWPT);
+ if (!healthiness.anyStalledThreads()) {
+ break;
}
- pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
- pendingDeletes.docIDs.clear();
}
- newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false, fieldInfos);
-
- Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
- for (DocumentsWriterThreadState threadState : threadStates) {
- threads.add(threadState.consumer);
+ if (infoStream != null && healthiness.anyStalledThreads()) {
+ message("WARNING DocumentsWriter still has stalled threads; waiting");
}
- double startMBUsed = bytesUsed()/1024./1024.;
+ healthiness.waitIfStalled(); // block if stalled
- consumer.flush(threads, flushState);
-
- newSegment.setHasVectors(flushState.hasVectors);
-
- if (infoStream != null) {
- message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
- if (flushState.deletedDocs != null) {
- message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
- }
- message("flushedFiles=" + newSegment.files());
- message("flushed codecs=" + newSegment.getSegmentCodecs());
+ if (infoStream != null && healthiness.anyStalledThreads()) {
+ message("WARNING DocumentsWriter done waiting");
}
+ }
- if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
- final String cfsFileName = IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
-
- if (infoStream != null) {
- message("flush: create compound file \"" + cfsFileName + "\"");
- }
+ final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+ this, doc);
+ final DocumentsWriterPerThread flushingDWPT;
+
+ try {
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
- for(String fileName : newSegment.files()) {
- cfsWriter.addFile(fileName);
- }
- cfsWriter.close();
- deleter.deleteNewFiles(newSegment.files());
- newSegment.setUseCompoundFile(true);
+ if (!perThread.isActive()) {
+ ensureOpen();
+ assert false: "perThread is not active but we are still open";
}
-
- // Must write deleted docs after the CFS so we don't
- // slurp the del file into CFS:
- if (flushState.deletedDocs != null) {
- final int delCount = flushState.deletedDocs.count();
- assert delCount > 0;
- newSegment.setDelCount(delCount);
- newSegment.advanceDelGen();
- final String delFileName = newSegment.getDelFileName();
- if (infoStream != null) {
- message("flush: write " + delCount + " deletes to " + delFileName);
- }
- boolean success2 = false;
- try {
- // TODO: in the NRT case it'd be better to hand
- // this del vector over to the
- // shortly-to-be-opened SegmentReader and let it
- // carry the changes; there's no reason to use
- // filesystem as intermediary here.
- flushState.deletedDocs.write(directory, delFileName);
- success2 = true;
- } finally {
- if (!success2) {
- try {
- directory.deleteFile(delFileName);
- } catch (Throwable t) {
- // suppress this so we keep throwing the
- // original exception
- }
- }
+
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
+ try {
+ dwpt.updateDocument(doc, analyzer, delTerm);
+ numDocsInRAM.incrementAndGet();
+ } finally {
+ if (dwpt.checkAndResetHasAborted()) {
+ flushControl.doOnAbort(perThread);
}
}
-
- if (infoStream != null) {
- message("flush: segment=" + newSegment);
- final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
- final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
- message(" ramUsed=" + nf.format(startMBUsed) + " MB" +
- " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
- " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
- " docs/MB=" + nf.format(numDocs / newSegmentSize) +
- " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
- }
-
- success = true;
+ flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
} finally {
- notifyAll();
- if (!success) {
- if (segment != null) {
- deleter.refresh(segment);
- }
- abort();
- }
+ perThread.unlock();
}
-
- doAfterFlush();
-
- // Lock order: IW -> DW -> BD
- pushDeletes(newSegment, segmentInfos);
- if (infoStream != null) {
- message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
- }
-
- return newSegment;
- }
-
- synchronized void close() {
- closed = true;
- notifyAll();
- }
-
- /** Returns a free (idle) ThreadState that may be used for
- * indexing this one document. This call also pauses if a
- * flush is pending. If delTerm is non-null then we
- * buffer this deleted term after the thread state has
- * been acquired. */
- synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
-
- final Thread currentThread = Thread.currentThread();
- assert !Thread.holdsLock(writer);
-
- // First, find a thread state. If this thread already
- // has affinity to a specific ThreadState, use that one
- // again.
- DocumentsWriterThreadState state = threadBindings.get(currentThread);
- if (state == null) {
-
- // First time this thread has called us since last
- // flush. Find the least loaded thread state:
- DocumentsWriterThreadState minThreadState = null;
- for(int i=0;i<threadStates.length;i++) {
- DocumentsWriterThreadState ts = threadStates[i];
- if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
- minThreadState = ts;
- }
- }
- if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
- state = minThreadState;
- state.numThreads++;
- } else {
- // Just create a new "private" thread state
- DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
- if (threadStates.length > 0) {
- System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
- }
- state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
- threadStates = newArray;
+
+ if (flushingDWPT != null) {
+ maybeMerge |= doFlush(flushingDWPT);
+ } else {
+ final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
+ if (nextPendingFlush != null) {
+ maybeMerge |= doFlush(nextPendingFlush);
}
- threadBindings.put(currentThread, state);
}
-
- // Next, wait until my thread state is idle (in case
- // it's shared with other threads), and no flush/abort
- // pending
- waitReady(state);
-
- // Allocate segment name if this is the first doc since
- // last flush:
- if (segment == null) {
- segment = writer.newSegmentName();
- assert numDocs == 0;
- }
-
- state.docState.docID = nextDocID++;
-
- if (delTerm != null) {
- pendingDeletes.addTerm(delTerm, state.docState.docID);
- }
-
- numDocs++;
- state.isIdle = false;
- return state;
- }
-
- boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
- return updateDocument(doc, analyzer, null);
+ return maybeMerge;
}
-
- boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
- throws CorruptIndexException, IOException {
- // Possibly trigger a flush, or wait until any running flush completes:
- boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
-
- // This call is synchronized but fast
- final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
-
- final DocState docState = state.docState;
- docState.doc = doc;
- docState.analyzer = analyzer;
-
- boolean success = false;
- try {
- // This call is not synchronized and does all the
- // work
- final DocWriter perDoc;
+ private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+ boolean maybeMerge = false;
+ while (flushingDWPT != null) {
+ maybeMerge = true;
+ boolean success = false;
+ FlushTicket ticket = null;
+
try {
- perDoc = state.consumer.processDocument(fieldInfos);
- } finally {
- docState.clear();
- }
-
- // This call is synchronized but fast
- finishDocument(state, perDoc);
-
- success = true;
- } finally {
- if (!success) {
-
- // If this thread state had decided to flush, we
- // must clear it so another thread can flush
- if (doFlush) {
- flushControl.clearFlushPending();
- }
-
- if (infoStream != null) {
- message("exception in updateDocument aborting=" + aborting);
- }
-
- synchronized(this) {
-
- state.isIdle = true;
- notifyAll();
-
- if (aborting) {
- abort();
- } else {
- skipDocWriter.docID = docState.docID;
- boolean success2 = false;
- try {
- waitQueue.add(skipDocWriter);
- success2 = true;
- } finally {
- if (!success2) {
- abort();
- return false;
- }
+ assert currentFullFlushDelQueue == null
+ || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+ + currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+ + " " + flushControl.isFullFlush();
+ /*
+ * Since with DWPT the flush process is concurrent and several DWPT
+ * could flush at the same time we must maintain the order of the
+ * flushes before we can apply the flushed segment and the frozen global
+ * deletes it is buffering. The reason for this is that the global
+ * deletes mark a certain point in time where we took a DWPT out of
+ * rotation and freeze the global deletes.
+ *
+ * Example: A flush 'A' starts and freezes the global deletes, then
+ * flush 'B' starts and freezes all deletes occurred since 'A' has
+ * started. if 'B' finishes before 'A' we need to wait until 'A' is done
+ * otherwise the deletes frozen by 'B' are not applied to 'A' and we
+ * might miss to deletes documents in 'A'.
+ */
+ try {
+ synchronized (ticketQueue) {
+ // Each flush is assigned a ticket in the order they accquire the ticketQueue lock
+ ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
+ ticketQueue.add(ticket);
+ }
+
+ // flush concurrently without locking
+ final FlushedSegment newSegment = flushingDWPT.flush();
+ synchronized (ticketQueue) {
+ ticket.segment = newSegment;
+ }
+ // flush was successful once we reached this point - new seg. has been assigned to the ticket!
+ success = true;
+ } finally {
+ if (!success && ticket != null) {
+ synchronized (ticketQueue) {
+ // In the case of a failure make sure we are making progress and
+ // apply all the deletes since the segment flush failed since the flush
+ // ticket could hold global deletes see FlushTicket#canPublish()
+ ticket.isSegmentFlush = false;
}
-
- // Immediately mark this document as deleted
- // since likely it was partially added. This
- // keeps indexing as "all or none" (atomic) when
- // adding a document:
- deleteDocID(state.docState.docID);
}
}
+ /*
+ * Now we are done and try to flush the ticket queue if the head of the
+ * queue has already finished the flush.
+ */
+ applyFlushTickets();
+ } finally {
+ flushControl.doAfterFlush(flushingDWPT);
+ flushingDWPT.checkAndResetHasAborted();
+ indexWriter.flushCount.incrementAndGet();
}
+
+ flushingDWPT = flushControl.nextPendingFlush();
}
-
- doFlush |= flushControl.flushByRAMUsage("new document");
-
- return doFlush;
- }
-
- public synchronized void waitIdle() {
- while (!allThreadsIdle()) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
+ return maybeMerge;
}
- synchronized void waitReady(DocumentsWriterThreadState state) {
- while (!closed && (!state.isIdle || aborting)) {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
+ private void applyFlushTickets() throws IOException {
+ synchronized (ticketQueue) {
+ while (true) {
+ // Keep publishing eligible flushed segments:
+ final FlushTicket head = ticketQueue.peek();
+ if (head != null && head.canPublish()) {
+ ticketQueue.poll();
+ finishFlush(head.segment, head.frozenDeletes);
+ } else {
+ break;
+ }
}
}
-
- if (closed) {
- throw new AlreadyClosedException("this IndexWriter is closed");
- }
}
- /** Does the synchronized work to finish/flush the
- * inverted document. */
- private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
-
- // Must call this w/o holding synchronized(this) else
- // we'll hit deadlock:
- balanceRAM();
-
- synchronized(this) {
-
- assert docWriter == null || docWriter.docID == perThread.docState.docID;
-
- if (aborting) {
-
- // We are currently aborting, and another thread is
- // waiting for me to become idle. We just forcefully
- // idle this threadState; it will be fully reset by
- // abort()
- if (docWriter != null) {
- try {
- docWriter.abort();
- } catch (Throwable t) {
- }
+ private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+ throws IOException {
+ // Finish the flushed segment and publish it to IndexWriter
+ if (newSegment == null) {
+ assert bufferedDeletes != null;
+ if (bufferedDeletes != null && bufferedDeletes.any()) {
+ indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+ if (infoStream != null) {
+ message("flush: push buffered deletes: " + bufferedDeletes);
}
-
- perThread.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
-
- return;
}
-
- final boolean doPause;
-
- if (docWriter != null) {
- doPause = waitQueue.add(docWriter);
- } else {
- skipDocWriter.docID = perThread.docState.docID;
- doPause = waitQueue.add(skipDocWriter);
- }
-
- if (doPause) {
- waitForWaitQueue();
- }
-
- perThread.isIdle = true;
-
- // wakes up any threads waiting on the wait queue
- notifyAll();
+ } else {
+ publishFlushedSegment(newSegment, bufferedDeletes);
}
}
- synchronized void waitForWaitQueue() {
- do {
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- } while (!waitQueue.doResume());
- }
-
- private static class SkipDocWriter extends DocWriter {
- @Override
- void finish() {
- }
- @Override
- void abort() {
- }
- @Override
- long sizeInBytes() {
- return 0;
+ final void subtractFlushedNumDocs(int numFlushed) {
+ int oldValue = numDocsInRAM.get();
+ while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
+ oldValue = numDocsInRAM.get();
}
}
- final SkipDocWriter skipDocWriter = new SkipDocWriter();
-
- NumberFormat nf = NumberFormat.getInstance();
-
- /* Initial chunks size of the shared byte[] blocks used to
- store postings data */
- final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
-
- /* if you increase this, you must fix field cache impl for
- * getTerms/getTermsIndex requires <= 32768. */
- final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
-
- /* Initial chunks size of the shared int[] blocks used to
- store postings data */
- final static int INT_BLOCK_SHIFT = 13;
- final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
- final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
- private List<int[]> freeIntBlocks = new ArrayList<int[]>();
-
- /* Allocate another int[] from the shared pool */
- synchronized int[] getIntBlock() {
- final int size = freeIntBlocks.size();
- final int[] b;
- if (0 == size) {
- b = new int[INT_BLOCK_SIZE];
- bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
- } else {
- b = freeIntBlocks.remove(size-1);
+
+ /**
+ * Publishes the flushed segment, segment private deletes (if any) and its
+ * associated global delete (if present) to IndexWriter. The actual
+ * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
+ * delete generation is always GlobalPacket_deleteGeneration + 1
+ */
+ private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
+ throws IOException {
+ assert newSegment != null;
+ final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
+ final BufferedDeletes deletes = newSegment.segmentDeletes;
+ FrozenBufferedDeletes packet = null;
+ if (deletes != null && deletes.any()) {
+ // Segment private delete
+ packet = new FrozenBufferedDeletes(deletes, true);
+ if (infoStream != null) {
+ message("flush: push buffered seg private deletes: " + packet);
+ }
}
- return b;
- }
-
- long bytesUsed() {
- return bytesUsed.get() + pendingDeletes.bytesUsed.get();
- }
- /* Return int[]s to the pool */
- synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
- for(int i=start;i<end;i++) {
- freeIntBlocks.add(blocks[i]);
- blocks[i] = null;
- }
+ // now publish!
+ indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
}
-
- final RecyclingByteBlockAllocator byteBlockAllocator = new RecyclingByteBlockAllocator(BYTE_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
- final static int PER_DOC_BLOCK_SIZE = 1024;
-
- final RecyclingByteBlockAllocator perDocAllocator = new RecyclingByteBlockAllocator(PER_DOC_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
- String toMB(long v) {
- return nf.format(v/1024./1024.);
+
+ // for asserts
+ private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+ // for asserts
+ private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
+ currentFullFlushDelQueue = session;
+ return true;
}
-
- /* We have three pools of RAM: Postings, byte blocks
- * (holds freq/prox posting data) and per-doc buffers
- * (stored fields/term vectors). Different docs require
- * varying amount of storage from these classes. For
- * example, docs with many unique single-occurrence short
- * terms will use up the Postings RAM and hardly any of
- * the other two. Whereas docs with very large terms will
- * use alot of byte blocks RAM. This method just frees
- * allocations from the pools once we are over-budget,
- * which balances the pools to match the current docs. */
- void balanceRAM() {
-
- final boolean doBalance;
- final long deletesRAMUsed;
-
- deletesRAMUsed = bufferedDeletesStream.bytesUsed();
-
- final long ramBufferSize;
- final double mb = config.getRAMBufferSizeMB();
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
- } else {
- ramBufferSize = (long) (mb*1024*1024);
+
+ /*
+ * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
+ * two stage operation; the caller must ensure (in try/finally) that finishFlush
+ * is called after this method, to release the flush lock in DWFlushControl
+ */
+ final boolean flushAllThreads()
+ throws IOException {
+ final DocumentsWriterDeleteQueue flushingDeleteQueue;
+
+ synchronized (this) {
+ flushingDeleteQueue = deleteQueue;
+ /* Cutover to a new delete queue. This must be synced on the flush control
+ * otherwise a new DWPT could sneak into the loop with an already flushing
+ * delete queue */
+ flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+ assert setFlushingDeleteQueue(flushingDeleteQueue);
}
-
- synchronized(this) {
- if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
- return;
- }
+ assert currentFullFlushDelQueue != null;
+ assert currentFullFlushDelQueue != deleteQueue;
- doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
- }
-
- if (doBalance) {
-
- if (infoStream != null) {
- message(" RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
- " vs trigger=" + toMB(ramBufferSize) +
- " deletesMB=" + toMB(deletesRAMUsed) +
- " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
- " perDocFree=" + toMB(perDocAllocator.bytesUsed()));
- }
-
- final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-
- int iter = 0;
-
- // We free equally from each pool in 32 KB
- // chunks until we are below our threshold
- // (freeLevel)
-
- boolean any = true;
-
- final long freeLevel = (long) (0.95 * ramBufferSize);
-
- while(bytesUsed()+deletesRAMUsed > freeLevel) {
-
- synchronized(this) {
- if (0 == perDocAllocator.numBufferedBlocks() &&
- 0 == byteBlockAllocator.numBufferedBlocks() &&
- 0 == freeIntBlocks.size() && !any) {
- // Nothing else to free -- must flush now.
- bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
- if (infoStream != null) {
- if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
- message(" nothing to free; set bufferIsFull");
- } else {
- message(" nothing to free");
- }
- }
- break;
- }
-
- if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) {
- byteBlockAllocator.freeBlocks(1);
- }
- if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
- freeIntBlocks.remove(freeIntBlocks.size()-1);
- bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
- }
- if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
- perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
- }
- }
-
- if ((3 == iter % 4) && any) {
- // Ask consumer to free any recycled state
- any = consumer.freeRAM();
+ boolean anythingFlushed = false;
+ try {
+ DocumentsWriterPerThread flushingDWPT;
+ // Help out with flushing:
+ while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+ anythingFlushed |= doFlush(flushingDWPT);
+ }
+ // If a concurrent flush is still in flight wait for it
+ while (flushControl.anyFlushing()) {
+ flushControl.waitForFlush();
+ }
+ if (!anythingFlushed) { // apply deletes if we did not flush any document
+ synchronized (ticketQueue) {
+ ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
-
- iter++;
- }
-
- if (infoStream != null) {
- message(" after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
+ applyFlushTickets();
}
+ } finally {
+ assert flushingDeleteQueue == currentFullFlushDelQueue;
}
+ return anythingFlushed;
}
-
- final WaitQueue waitQueue = new WaitQueue();
-
- private class WaitQueue {
- DocWriter[] waiting;
- int nextWriteDocID;
- int nextWriteLoc;
- int numWaiting;
- long waitingBytes;
-
- public WaitQueue() {
- waiting = new DocWriter[10];
- }
-
- synchronized void reset() {
- // NOTE: nextWriteLoc doesn't need to be reset
- assert numWaiting == 0;
- assert waitingBytes == 0;
- nextWriteDocID = 0;
- }
-
- synchronized boolean doResume() {
- final double mb = config.getRAMBufferSizeMB();
- final long waitQueueResumeBytes;
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- waitQueueResumeBytes = 2*1024*1024;
- } else {
- waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
- }
- return waitingBytes <= waitQueueResumeBytes;
- }
-
- synchronized boolean doPause() {
- final double mb = config.getRAMBufferSizeMB();
- final long waitQueuePauseBytes;
- if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
- waitQueuePauseBytes = 4*1024*1024;
- } else {
- waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
- }
- return waitingBytes > waitQueuePauseBytes;
- }
-
- synchronized void abort() {
- int count = 0;
- for(int i=0;i<waiting.length;i++) {
- final DocWriter doc = waiting[i];
- if (doc != null) {
- doc.abort();
- waiting[i] = null;
- count++;
- }
- }
- waitingBytes = 0;
- assert count == numWaiting;
- numWaiting = 0;
+
+ final void finishFullFlush(boolean success) {
+ assert setFlushingDeleteQueue(null);
+ if (success) {
+ // Release the flush lock
+ flushControl.finishFullFlush();
+ } else {
+ flushControl.abortFullFlushes();
}
+ }
- private void writeDocument(DocWriter doc) throws IOException {
- assert doc == skipDocWriter || nextWriteDocID == doc.docID;
- boolean success = false;
- try {
- doc.finish();
- nextWriteDocID++;
- nextWriteLoc++;
- assert nextWriteLoc <= waiting.length;
- if (nextWriteLoc == waiting.length) {
- nextWriteLoc = 0;
- }
- success = true;
- } finally {
- if (!success) {
- setAborting();
- }
- }
+ static final class FlushTicket {
+ final FrozenBufferedDeletes frozenDeletes;
+ /* access to non-final members must be synchronized on DW#ticketQueue */
+ FlushedSegment segment;
+ boolean isSegmentFlush;
+
+ FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
+ this.frozenDeletes = frozenDeletes;
+ this.isSegmentFlush = isSegmentFlush;
}
-
- synchronized public boolean add(DocWriter doc) throws IOException {
-
- assert doc.docID >= nextWriteDocID;
-
- if (doc.docID == nextWriteDocID) {
- writeDocument(doc);
- while(true) {
- doc = waiting[nextWriteLoc];
- if (doc != null) {
- numWaiting--;
- waiting[nextWriteLoc] = null;
- waitingBytes -= doc.sizeInBytes();
- writeDocument(doc);
- } else {
- break;
- }
- }
- } else {
-
- // I finished before documents that were added
- // before me. This can easily happen when I am a
- // small doc and the docs before me were large, or,
- // just due to luck in the thread scheduling. Just
- // add myself to the queue and when that large doc
- // finishes, it will flush me:
- int gap = doc.docID - nextWriteDocID;
- if (gap >= waiting.length) {
- // Grow queue
- DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
- assert nextWriteLoc >= 0;
- System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
- System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
- nextWriteLoc = 0;
- waiting = newArray;
- gap = doc.docID - nextWriteDocID;
- }
-
- int loc = nextWriteLoc + gap;
- if (loc >= waiting.length) {
- loc -= waiting.length;
- }
-
- // We should only wrap one time
- assert loc < waiting.length;
-
- // Nobody should be in my spot!
- assert waiting[loc] == null;
- waiting[loc] = doc;
- numWaiting++;
- waitingBytes += doc.sizeInBytes();
- }
-
- return doPause();
+
+ boolean canPublish() {
+ return (!isSegmentFlush || segment != null);
}
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java Sun May 1 22:38:33 2011
@@ -2,13 +2,13 @@ package org.apache.lucene.index;
/**
* Copyright 2004 The Apache Software Foundation
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -22,15 +22,14 @@ import java.util.List;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
final class FieldsWriter {
static final byte FIELD_IS_TOKENIZED = 0x1;
static final byte FIELD_IS_BINARY = 0x2;
-
+
// Lucene 3.0: Removal of compressed fields
static final int FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS = 2;
@@ -38,7 +37,7 @@ final class FieldsWriter {
// than the current one, and always change this if you
// switch to a new format!
static final int FORMAT_CURRENT = FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS;
-
+
// when removing support for old versions, leave the last supported version here
static final int FORMAT_MINIMUM = FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS;
@@ -83,10 +82,9 @@ final class FieldsWriter {
// and adds a new entry for this document into the index
// stream. This assumes the buffer was already written
// in the correct fields format.
- void flushDocument(int numStoredFields, RAMOutputStream buffer) throws IOException {
+ void startDocument(int numStoredFields) throws IOException {
indexStream.writeLong(fieldsStream.getFilePointer());
fieldsStream.writeVInt(numStoredFields);
- buffer.writeTo(fieldsStream);
}
void skipDocument() throws IOException {
@@ -121,8 +119,8 @@ final class FieldsWriter {
}
}
- final void writeField(FieldInfo fi, Fieldable field) throws IOException {
- fieldsStream.writeVInt(fi.number);
+ final void writeField(int fieldNumber, Fieldable field) throws IOException {
+ fieldsStream.writeVInt(fieldNumber);
byte bits = 0;
if (field.isTokenized())
bits |= FieldsWriter.FIELD_IS_TOKENIZED;
@@ -175,10 +173,9 @@ final class FieldsWriter {
fieldsStream.writeVInt(storedCount);
-
for (Fieldable field : fields) {
if (field.isStored())
- writeField(fieldInfos.fieldInfo(field.name()), field);
+ writeField(fieldInfos.fieldNumber(field.name()), field);
}
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Sun May 1 22:38:33 2011
@@ -19,55 +19,35 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.codecs.FieldsConsumer;
-import org.apache.lucene.index.codecs.PostingsConsumer;
-import org.apache.lucene.index.codecs.TermStats;
-import org.apache.lucene.index.codecs.TermsConsumer;
-import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
final class FreqProxTermsWriter extends TermsHashConsumer {
@Override
- public TermsHashConsumerPerThread addThread(TermsHashPerThread perThread) {
- return new FreqProxTermsWriterPerThread(perThread);
- }
-
- @Override
void abort() {}
- private int flushedDocCount;
-
// TODO: would be nice to factor out more of this, eg the
// FreqProxFieldMergeState, and code to visit all Fields
// under the same FieldInfo together, up into TermsHash*.
// Other writers would presumably share alot of this...
@Override
- public void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
+ public void flush(Map<FieldInfo, TermsHashConsumerPerField> fieldsToFlush, final SegmentWriteState state) throws IOException {
// Gather all FieldData's that have postings, across all
// ThreadStates
List<FreqProxTermsWriterPerField> allFields = new ArrayList<FreqProxTermsWriterPerField>();
-
- flushedDocCount = state.numDocs;
-
- for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
-
- Collection<TermsHashConsumerPerField> fields = entry.getValue();
-
- for (final TermsHashConsumerPerField i : fields) {
- final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) i;
- if (perField.termsHashPerField.bytesHash.size() > 0)
+ for (TermsHashConsumerPerField f : fieldsToFlush.values()) {
+ final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) f;
+ if (perField.termsHashPerField.bytesHash.size() > 0) {
allFields.add(perField);
- }
+ }
}
final int numAllFields = allFields.size();
@@ -77,6 +57,8 @@ final class FreqProxTermsWriter extends
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
+ TermsHash termsHash = null;
+
/*
Current writer chain:
FieldsConsumer
@@ -89,257 +71,48 @@ final class FreqProxTermsWriter extends
-> IMPL: FormatPostingsPositionsWriter
*/
- int start = 0;
- while(start < numAllFields) {
- final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
- final String fieldName = fieldInfo.name;
-
- int end = start+1;
- while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
- end++;
-
- FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
- for(int i=start;i<end;i++) {
- fields[i-start] = allFields.get(i);
-
- // Aggregate the storePayload as seen by the same
- // field across multiple threads
- if (!fieldInfo.omitTermFreqAndPositions) {
- fieldInfo.storePayloads |= fields[i-start].hasPayloads;
- }
+ for (int fieldNumber = 0; fieldNumber < numAllFields; fieldNumber++) {
+ final FieldInfo fieldInfo = allFields.get(fieldNumber).fieldInfo;
+
+ final FreqProxTermsWriterPerField fieldWriter = allFields.get(fieldNumber);
+
+ // Aggregate the storePayload as seen by the same
+ // field across multiple threads
+ if (!fieldInfo.omitTermFreqAndPositions) {
+ fieldInfo.storePayloads |= fieldWriter.hasPayloads;
}
// If this field has postings then add them to the
// segment
- appendPostings(fieldName, state, fields, consumer);
-
- for(int i=0;i<fields.length;i++) {
- TermsHashPerField perField = fields[i].termsHashPerField;
- int numPostings = perField.bytesHash.size();
- perField.reset();
- perField.shrinkHash(numPostings);
- fields[i].reset();
- }
+ fieldWriter.flush(fieldInfo.name, consumer, state);
- start = end;
+ TermsHashPerField perField = fieldWriter.termsHashPerField;
+ assert termsHash == null || termsHash == perField.termsHash;
+ termsHash = perField.termsHash;
+ int numPostings = perField.bytesHash.size();
+ perField.reset();
+ perField.shrinkHash(numPostings);
+ fieldWriter.reset();
}
- for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
- FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
- perThread.termsHashPerThread.reset(true);
+ if (termsHash != null) {
+ termsHash.reset();
}
consumer.close();
}
BytesRef payload;
- /* Walk through all unique text tokens (Posting
- * instances) found in this field and serialize them
- * into a single RAM segment. */
- void appendPostings(String fieldName, SegmentWriteState state,
- FreqProxTermsWriterPerField[] fields,
- FieldsConsumer consumer)
- throws CorruptIndexException, IOException {
-
- int numFields = fields.length;
-
- final BytesRef text = new BytesRef();
-
- final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
-
- final TermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
- final Comparator<BytesRef> termComp = termsConsumer.getComparator();
-
- for(int i=0;i<numFields;i++) {
- FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i], termComp);
-
- assert fms.field.fieldInfo == fields[0].fieldInfo;
-
- // Should always be true
- boolean result = fms.nextTerm();
- assert result;
- }
-
- final Term protoTerm = new Term(fieldName);
-
- FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
-
- final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;
- //System.out.println("flush terms field=" + fields[0].fieldInfo.name);
-
- final Map<Term,Integer> segDeletes;
- if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
- segDeletes = state.segDeletes.terms;
- } else {
- segDeletes = null;
- }
-
- // TODO: really TermsHashPerField should take over most
- // of this loop, including merge sort of terms from
- // multiple threads and interacting with the
- // TermsConsumer, only calling out to us (passing us the
- // DocsConsumer) to handle delivery of docs/positions
- long sumTotalTermFreq = 0;
- while(numFields > 0) {
-
- // Get the next term to merge
- termStates[0] = mergeStates[0];
- int numToMerge = 1;
-
- // TODO: pqueue
- for(int i=1;i<numFields;i++) {
- final int cmp = termComp.compare(mergeStates[i].text, termStates[0].text);
- if (cmp < 0) {
- termStates[0] = mergeStates[i];
- numToMerge = 1;
- } else if (cmp == 0) {
- termStates[numToMerge++] = mergeStates[i];
- }
- }
-
- // Need shallow copy here because termStates[0].text
- // changes by the time we call finishTerm
- text.bytes = termStates[0].text.bytes;
- text.offset = termStates[0].text.offset;
- text.length = termStates[0].text.length;
-
- //System.out.println(" term=" + text.toUnicodeString());
- //System.out.println(" term=" + text.toString());
-
- final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
-
- final int delDocLimit;
- if (segDeletes != null) {
- final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
- if (docIDUpto != null) {
- delDocLimit = docIDUpto;
- } else {
- delDocLimit = 0;
- }
- } else {
- delDocLimit = 0;
- }
-
- // Now termStates has numToMerge FieldMergeStates
- // which all share the same term. Now we must
- // interleave the docID streams.
- int numDocs = 0;
- long totTF = 0;
- while(numToMerge > 0) {
-
- FreqProxFieldMergeState minState = termStates[0];
- for(int i=1;i<numToMerge;i++) {
- if (termStates[i].docID < minState.docID) {
- minState = termStates[i];
- }
- }
-
- final int termDocFreq = minState.termFreq;
- numDocs++;
-
- assert minState.docID < flushedDocCount: "doc=" + minState.docID + " maxDoc=" + flushedDocCount;
-
- // NOTE: we could check here if the docID was
- // deleted, and skip it. However, this is somewhat
- // dangerous because it can yield non-deterministic
- // behavior since we may see the docID before we see
- // the term that caused it to be deleted. This
- // would mean some (but not all) of its postings may
- // make it into the index, which'd alter the docFreq
- // for those terms. We could fix this by doing two
- // passes, ie first sweep marks all del docs, and
- // 2nd sweep does the real flush, but I suspect
- // that'd add too much time to flush.
-
- postingsConsumer.startDoc(minState.docID, termDocFreq);
- if (minState.docID < delDocLimit) {
- // Mark it deleted. TODO: we could also skip
- // writing its postings; this would be
- // deterministic (just for this Term's docs).
- if (state.deletedDocs == null) {
- state.deletedDocs = new BitVector(state.numDocs);
- }
- state.deletedDocs.set(minState.docID);
- }
-
- final ByteSliceReader prox = minState.prox;
-
- // Carefully copy over the prox + payload info,
- // changing the format to match Lucene's segment
- // format.
- if (!currentFieldOmitTermFreqAndPositions) {
- // omitTermFreqAndPositions == false so we do write positions &
- // payload
- int position = 0;
- totTF += termDocFreq;
- for(int j=0;j<termDocFreq;j++) {
- final int code = prox.readVInt();
- position += code >> 1;
- //System.out.println(" pos=" + position);
-
- final int payloadLength;
- final BytesRef thisPayload;
-
- if ((code & 1) != 0) {
- // This position has a payload
- payloadLength = prox.readVInt();
-
- if (payload == null) {
- payload = new BytesRef();
- payload.bytes = new byte[payloadLength];
- } else if (payload.bytes.length < payloadLength) {
- payload.grow(payloadLength);
- }
-
- prox.readBytes(payload.bytes, 0, payloadLength);
- payload.length = payloadLength;
- thisPayload = payload;
-
- } else {
- payloadLength = 0;
- thisPayload = null;
- }
-
- postingsConsumer.addPosition(position, thisPayload);
- } //End for
-
- postingsConsumer.finishDoc();
- }
-
- if (!minState.nextDoc()) {
-
- // Remove from termStates
- int upto = 0;
- // TODO: inefficient O(N) where N = number of
- // threads that had seen this term:
- for(int i=0;i<numToMerge;i++) {
- if (termStates[i] != minState) {
- termStates[upto++] = termStates[i];
- }
- }
- numToMerge--;
- assert upto == numToMerge;
-
- // Advance this state to the next term
-
- if (!minState.nextTerm()) {
- // OK, no more terms, so remove from mergeStates
- // as well
- upto = 0;
- for(int i=0;i<numFields;i++)
- if (mergeStates[i] != minState)
- mergeStates[upto++] = mergeStates[i];
- numFields--;
- assert upto == numFields;
- }
- }
- }
+ @Override
+ public TermsHashConsumerPerField addField(TermsHashPerField termsHashPerField, FieldInfo fieldInfo) {
+ return new FreqProxTermsWriterPerField(termsHashPerField, this, fieldInfo);
+ }
- assert numDocs > 0;
- termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
- sumTotalTermFreq += totTF;
- }
+ @Override
+ void finishDocument(TermsHash termsHash) throws IOException {
+ }
- termsConsumer.finish(sumTotalTermFreq);
+ @Override
+ void startDocument() throws IOException {
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Sun May 1 22:38:33 2011
@@ -18,9 +18,17 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.codecs.FieldsConsumer;
+import org.apache.lucene.index.codecs.PostingsConsumer;
+import org.apache.lucene.index.codecs.TermStats;
+import org.apache.lucene.index.codecs.TermsConsumer;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
// TODO: break into separate freq and prox writers as
@@ -28,17 +36,17 @@ import org.apache.lucene.util.RamUsageEs
// be configured as any number of files 1..N
final class FreqProxTermsWriterPerField extends TermsHashConsumerPerField implements Comparable<FreqProxTermsWriterPerField> {
- final FreqProxTermsWriterPerThread perThread;
+ final FreqProxTermsWriter parent;
final TermsHashPerField termsHashPerField;
final FieldInfo fieldInfo;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final FieldInvertState fieldState;
boolean omitTermFreqAndPositions;
PayloadAttribute payloadAttribute;
- public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriterPerThread perThread, FieldInfo fieldInfo) {
+ public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriter parent, FieldInfo fieldInfo) {
this.termsHashPerField = termsHashPerField;
- this.perThread = perThread;
+ this.parent = parent;
this.fieldInfo = fieldInfo;
docState = termsHashPerField.docState;
fieldState = termsHashPerField.fieldState;
@@ -78,8 +86,8 @@ final class FreqProxTermsWriterPerField
if (fields[i].isIndexed())
return true;
return false;
- }
-
+ }
+
@Override
void start(Fieldable f) {
if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {
@@ -96,18 +104,18 @@ final class FreqProxTermsWriterPerField
} else {
payload = payloadAttribute.getPayload();
}
-
+
if (payload != null && payload.length > 0) {
termsHashPerField.writeVInt(1, (proxCode<<1)|1);
termsHashPerField.writeVInt(1, payload.length);
termsHashPerField.writeBytes(1, payload.data, payload.offset, payload.length);
- hasPayloads = true;
+ hasPayloads = true;
} else
termsHashPerField.writeVInt(1, proxCode<<1);
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
postings.lastPositions[termID] = fieldState.position;
-
+
}
@Override
@@ -115,7 +123,7 @@ final class FreqProxTermsWriterPerField
// First time we're seeing this term since the last
// flush
assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
postings.lastDocIDs[termID] = docState.docID;
if (omitTermFreqAndPositions) {
@@ -132,9 +140,9 @@ final class FreqProxTermsWriterPerField
void addTerm(final int termID) {
assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
-
+
FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
-
+
assert omitTermFreqAndPositions || postings.docFreqs[termID] > 0;
if (omitTermFreqAndPositions) {
@@ -169,7 +177,7 @@ final class FreqProxTermsWriterPerField
}
}
}
-
+
@Override
ParallelPostingsArray createPostingsArray(int size) {
return new FreqProxPostingsArray(size);
@@ -212,7 +220,180 @@ final class FreqProxTermsWriterPerField
return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
}
}
-
+
public void abort() {}
+
+ BytesRef payload;
+
+ /* Walk through all unique text tokens (Posting
+ * instances) found in this field and serialize them
+ * into a single RAM segment. */
+ void flush(String fieldName, FieldsConsumer consumer, final SegmentWriteState state)
+ throws CorruptIndexException, IOException {
+
+ final TermsConsumer termsConsumer = consumer.addField(fieldInfo);
+ final Comparator<BytesRef> termComp = termsConsumer.getComparator();
+
+ final Term protoTerm = new Term(fieldName);
+
+ final boolean currentFieldOmitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions;
+
+ final Map<Term,Integer> segDeletes;
+ if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
+ segDeletes = state.segDeletes.terms;
+ } else {
+ segDeletes = null;
+ }
+
+ final int[] termIDs = termsHashPerField.sortPostings(termComp);
+ final int numTerms = termsHashPerField.bytesHash.size();
+ final BytesRef text = new BytesRef();
+ final FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
+ final ByteSliceReader freq = new ByteSliceReader();
+ final ByteSliceReader prox = new ByteSliceReader();
+
+ long sumTotalTermFreq = 0;
+ for (int i = 0; i < numTerms; i++) {
+ final int termID = termIDs[i];
+ // Get BytesRef
+ final int textStart = postings.textStarts[termID];
+ termsHashPerField.bytePool.setBytesRef(text, textStart);
+
+ termsHashPerField.initReader(freq, termID, 0);
+ if (!fieldInfo.omitTermFreqAndPositions) {
+ termsHashPerField.initReader(prox, termID, 1);
+ }
+
+ // TODO: really TermsHashPerField should take over most
+ // of this loop, including merge sort of terms from
+ // multiple threads and interacting with the
+ // TermsConsumer, only calling out to us (passing us the
+ // DocsConsumer) to handle delivery of docs/positions
+
+ final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
+
+ final int delDocLimit;
+ if (segDeletes != null) {
+ final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
+ if (docIDUpto != null) {
+ delDocLimit = docIDUpto;
+ } else {
+ delDocLimit = 0;
+ }
+ } else {
+ delDocLimit = 0;
+ }
+
+ // Now termStates has numToMerge FieldMergeStates
+ // which all share the same term. Now we must
+ // interleave the docID streams.
+ int numDocs = 0;
+ long totTF = 0;
+ int docID = 0;
+ int termFreq = 0;
+
+ while(true) {
+ if (freq.eof()) {
+ if (postings.lastDocCodes[termID] != -1) {
+ // Return last doc
+ docID = postings.lastDocIDs[termID];
+ if (!omitTermFreqAndPositions) {
+ termFreq = postings.docFreqs[termID];
+ }
+ postings.lastDocCodes[termID] = -1;
+ } else {
+ // EOF
+ break;
+ }
+ } else {
+ final int code = freq.readVInt();
+ if (omitTermFreqAndPositions) {
+ docID += code;
+ } else {
+ docID += code >>> 1;
+ if ((code & 1) != 0) {
+ termFreq = 1;
+ } else {
+ termFreq = freq.readVInt();
+ }
+ }
+
+ assert docID != postings.lastDocIDs[termID];
+ }
+
+ numDocs++;
+ assert docID < state.numDocs: "doc=" + docID + " maxDoc=" + state.numDocs;
+ final int termDocFreq = termFreq;
+
+ // NOTE: we could check here if the docID was
+ // deleted, and skip it. However, this is somewhat
+ // dangerous because it can yield non-deterministic
+ // behavior since we may see the docID before we see
+ // the term that caused it to be deleted. This
+ // would mean some (but not all) of its postings may
+ // make it into the index, which'd alter the docFreq
+ // for those terms. We could fix this by doing two
+ // passes, ie first sweep marks all del docs, and
+ // 2nd sweep does the real flush, but I suspect
+ // that'd add too much time to flush.
+ postingsConsumer.startDoc(docID, termDocFreq);
+ if (docID < delDocLimit) {
+ // Mark it deleted. TODO: we could also skip
+ // writing its postings; this would be
+ // deterministic (just for this Term's docs).
+ if (state.deletedDocs == null) {
+ state.deletedDocs = new BitVector(state.numDocs);
+ }
+ state.deletedDocs.set(docID);
+ }
+
+ // Carefully copy over the prox + payload info,
+ // changing the format to match Lucene's segment
+ // format.
+ if (!currentFieldOmitTermFreqAndPositions) {
+ // omitTermFreqAndPositions == false so we do write positions &
+ // payload
+ int position = 0;
+ totTF += termDocFreq;
+ for(int j=0;j<termDocFreq;j++) {
+ final int code = prox.readVInt();
+ position += code >> 1;
+
+ final int payloadLength;
+ final BytesRef thisPayload;
+
+ if ((code & 1) != 0) {
+ // This position has a payload
+ payloadLength = prox.readVInt();
+
+ if (payload == null) {
+ payload = new BytesRef();
+ payload.bytes = new byte[payloadLength];
+ } else if (payload.bytes.length < payloadLength) {
+ payload.grow(payloadLength);
+ }
+
+ prox.readBytes(payload.bytes, 0, payloadLength);
+ payload.length = payloadLength;
+ thisPayload = payload;
+
+ } else {
+ payloadLength = 0;
+ thisPayload = null;
+ }
+
+ postingsConsumer.addPosition(position, thisPayload);
+ }
+
+ postingsConsumer.finishDoc();
+ }
+ }
+ termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
+ sumTotalTermFreq += totTF;
+ }
+
+ termsConsumer.finish(sumTotalTermFreq);
+ }
+
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Sun May 1 22:38:33 2011
@@ -21,7 +21,13 @@ import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.store.Directory;
@@ -49,12 +55,12 @@ import org.apache.lucene.util.Collection
* (IndexDeletionPolicy) is consulted on creation (onInit)
* and once per commit (onCommit), to decide when a commit
* should be removed.
- *
+ *
* It is the business of the IndexDeletionPolicy to choose
* when to delete commit points. The actual mechanics of
* file deletion, retrying, etc, derived from the deletion
* of commit points is the business of the IndexFileDeleter.
- *
+ *
* The current default deletion policy is {@link
* KeepOnlyLastCommitDeletionPolicy}, which removes all
* prior commits when a new commit has completed. This
@@ -72,7 +78,7 @@ final class IndexFileDeleter {
* so we will retry them again later: */
private List<String> deletable;
- /* Reference count for all files in the index.
+ /* Reference count for all files in the index.
* Counts how many existing commits reference a file.
**/
private Map<String, RefCount> refCounts = new HashMap<String, RefCount>();
@@ -88,7 +94,7 @@ final class IndexFileDeleter {
* non-commit checkpoint: */
private List<Collection<String>> lastFiles = new ArrayList<Collection<String>>();
- /* Commits that the IndexDeletionPolicy have decided to delete: */
+ /* Commits that the IndexDeletionPolicy have decided to delete: */
private List<CommitPoint> commitsToDelete = new ArrayList<CommitPoint>();
private PrintStream infoStream;
@@ -108,7 +114,7 @@ final class IndexFileDeleter {
message("setInfoStream deletionPolicy=" + policy);
}
}
-
+
private void message(String message) {
infoStream.println("IFD [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
}
@@ -139,12 +145,12 @@ final class IndexFileDeleter {
// counts:
long currentGen = segmentInfos.getGeneration();
indexFilenameFilter = new IndexFileNameFilter(codecs);
-
+
CommitPoint currentCommitPoint = null;
String[] files = null;
try {
files = directory.listAll();
- } catch (NoSuchDirectoryException e) {
+ } catch (NoSuchDirectoryException e) {
// it means the directory is empty, so ignore it.
files = new String[0];
}
@@ -152,7 +158,7 @@ final class IndexFileDeleter {
for (String fileName : files) {
if ((indexFilenameFilter.accept(null, fileName)) && !fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
-
+
// Add this file to refCounts with initial count 0:
getRefCount(fileName);
@@ -233,7 +239,7 @@ final class IndexFileDeleter {
// Now delete anything with ref count at 0. These are
// presumably abandoned files eg due to crash of
// IndexWriter.
- for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
+ for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
RefCount rc = entry.getValue();
final String fileName = entry.getKey();
if (0 == rc.count) {
@@ -253,7 +259,7 @@ final class IndexFileDeleter {
// Always protect the incoming segmentInfos since
// sometime it may not be the most recent commit
checkpoint(segmentInfos, false);
-
+
startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
deleteCommits();
@@ -327,7 +333,7 @@ final class IndexFileDeleter {
segmentPrefix1 = null;
segmentPrefix2 = null;
}
-
+
for(int i=0;i<files.length;i++) {
String fileName = files[i];
if ((segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
@@ -379,7 +385,7 @@ final class IndexFileDeleter {
deleteCommits();
}
}
-
+
public void deletePendingFiles() throws IOException {
if (deletable != null) {
List<String> oldDeletable = deletable;
@@ -397,7 +403,7 @@ final class IndexFileDeleter {
/**
* For definition of "check point" see IndexWriter comments:
* "Clarification: Check Points (and commits)".
- *
+ *
* Writer calls this when it has made a "consistent
* change" to the index, meaning new files are written to
* the index and the in-memory SegmentInfos have been
@@ -417,7 +423,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable