You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/07/18 11:20:14 UTC
svn commit: r677865 [2/5] - in /lucene/java/trunk: ./
src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/
src/java/org/apache/lucene/util/ src/test/org/apache/lucene/
src/test/org/apache/lucene/index/ src/test/org/apache/lucene/search/
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=677865&r1=677864&r2=677865&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Jul 18 02:20:12 2008
@@ -25,21 +25,19 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.util.UnicodeUtil;
+import org.apache.lucene.util.ArrayUtil;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.ArrayList;
import java.util.Map.Entry;
import java.text.NumberFormat;
-import java.util.Collections;
/**
* This class accepts multiple added documents and directly
@@ -48,31 +46,23 @@
* (with DocumentWriter) and doing standard merges on those
* segments.
*
- * When a document is added, its stored fields (if any) and
- * term vectors (if any) are immediately written to the
- * Directory (ie these do not consume RAM). The freq/prox
- * postings are accumulated into a Postings hash table keyed
- * by term. Each entry in this hash table holds a separate
- * byte stream (allocated as incrementally growing slices
- * into large shared byte[] arrays) for freq and prox, that
- * contains the postings data for multiple documents. If
- * vectors are enabled, each unique term for each document
- * also allocates a PostingVector instance to similarly
- * track the offsets & positions byte stream.
- *
- * Once the Postings hash is full (ie is consuming the
- * allowed RAM) or the number of added docs is large enough
- * (in the case we are flushing by doc count instead of RAM
- * usage), we create a real segment and flush it to disk and
- * reset the Postings hash.
- *
- * In adding a document we first organize all of its fields
- * by field name. We then process field by field, and
- * record the Posting hash per-field. After each field we
- * flush its term vectors. When it's time to flush the full
- * segment we first sort the fields by name, and then go
- * field by field and sorts its postings.
+ * Each added document is passed to the {@link DocConsumer},
+ * which in turn processes the document and interacts with
+ * other consumers in the indexing chain. Certain
+ * consumers, like {@link StoredFieldsWriter} and {@link
+ * TermVectorsTermsWriter}, digest a document and
+ * immediately write bytes to the "doc store" files (ie,
+ * they do not consume RAM per document, except while they
+ * are processing the document).
*
+ * Other consumers, eg {@link FreqProxTermsWriter} and
+ * {@link NormsWriter}, buffer bytes in RAM and flush only
+ * when a new segment is produced.
+
+ * Once we have used our allowed RAM buffer, or the number
+ * of added docs is large enough (in the case we are
+ * flushing by doc count instead of RAM usage), we create a
+ * real segment and flush it to the Directory.
*
* Threads:
*
@@ -88,12 +78,6 @@
* call). Finally the synchronized "finishDocument" is
* called to flush changes to the directory.
*
- * Each ThreadState instance has its own Posting hash. Once
- * we're using too much RAM, we flush all Posting hashes to
- * a segment by merging the docIDs in the posting lists for
- * the same term across multiple thread states (see
- * writeSegment and appendPostings).
- *
* When flush is called by IndexWriter, or, we flush
* internally when autoCommit=false, we forcefully idle all
* threads and flush only once they are all idle. This
@@ -128,38 +112,79 @@
IndexWriter writer;
Directory directory;
- FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen
- IndexOutput tvx, tvf, tvd; // To write term vectors
- FieldsWriter fieldsWriter; // To write stored fields
-
String segment; // Current segment we are working on
- String docStoreSegment; // Current doc-store segment we are writing
+ private String docStoreSegment; // Current doc-store segment we are writing
private int docStoreOffset; // Current starting doc-store offset of current segment
private int nextDocID; // Next docID to be added
private int numDocsInRAM; // # docs buffered in RAM
int numDocsInStore; // # docs written to doc stores
- private int nextWriteDocID; // Next docID to be written
// Max # ThreadState instances; if there are more threads
// than this they share ThreadStates
private final static int MAX_THREAD_STATE = 5;
private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
private final HashMap threadBindings = new HashMap();
- private int numWaiting;
- private final DocumentsWriterThreadState[] waitingThreadStates = new DocumentsWriterThreadState[MAX_THREAD_STATE];
- private int pauseThreads; // Non-zero when we need all threads to
- // pause (eg to flush)
+
+ private int pauseThreads; // Non-zero when we need all threads to
+ // pause (eg to flush)
boolean flushPending; // True when a thread has decided to flush
boolean bufferIsFull; // True when it's time to write segment
- private int abortCount; // Non-zero while abort is pending or running
+ private boolean aborting; // True if an abort is pending
PrintStream infoStream;
-
- boolean hasNorms; // Whether any norms were seen since last flush
+ int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
+ Similarity similarity;
List newFiles;
+ static class DocState {
+ DocumentsWriter docWriter;
+ Analyzer analyzer;
+ int maxFieldLength;
+ PrintStream infoStream;
+ Similarity similarity;
+ int docID;
+ Document doc;
+ String maxTermPrefix;
+
+ // Only called by asserts
+ public boolean testPoint(String name) {
+ return docWriter.writer.testPoint(name);
+ }
+ }
+
+ static class FlushState {
+ DocumentsWriter docWriter;
+ Directory directory;
+ String segmentName;
+ String docStoreSegmentName;
+ int numDocsInRAM;
+ int numDocsInStore;
+ Collection flushedFiles;
+
+ public String segmentFileName(String ext) {
+ return segmentName + "." + ext;
+ }
+ }
+
+ /** Consumer returns this on each doc. This holds any
+ * state that must be flushed synchronized "in docID
+ * order". We gather these and flush them in order. */
+ abstract static class DocWriter {
+ DocWriter next;
+ int docID;
+ abstract void finish() throws IOException;
+ abstract void abort();
+ abstract long sizeInBytes();
+
+ void setNext(DocWriter next) {
+ this.next = next;
+ }
+ };
+
+ final DocConsumer consumer;
+
// Deletes done after the last flush; these are discarded
// on abort
private BufferedDeletes deletesInRAM = new BufferedDeletes();
@@ -175,8 +200,15 @@
// How much RAM we can use before flushing. This is 0 if
// we are flushing by doc count instead.
private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);
+ private long waitQueuePauseBytes = (long) (ramBufferSize*0.1);
+ private long waitQueueResumeBytes = (long) (ramBufferSize*0.05);
+
+ // If we've allocated 5% over our RAM budget, we then
+ // free down to 95%
+ private long freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05);
+ private long freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95);
- // Flush @ this number of docs. If rarmBufferSize is
+ // Flush @ this number of docs. If ramBufferSize is
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;
@@ -194,34 +226,80 @@
private boolean closed;
- // Coarse estimates used to measure RAM usage of buffered deletes
- private static int OBJECT_HEADER_BYTES = 8;
-
- BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush
-
DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {
this.directory = directory;
this.writer = writer;
- flushedDocCount = writer.docCount();
- postingsFreeList = new Posting[0];
+ this.similarity = writer.getSimilarity();
+ flushedDocCount = writer.maxDoc();
+
+ /*
+ This is the current indexing chain:
+
+ DocConsumer / DocConsumerPerThread
+ --> code: DocFieldProcessor / DocFieldProcessorPerThread
+ --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+ --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+ --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+ --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+ --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+ --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+ --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+ --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+ --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+ --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+ --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+ */
+
+ // TODO FI: this should be something the user can pass in
+ // Build up indexing chain:
+ final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(this);
+ final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+ final InvertedDocConsumer termsHash = new TermsHash(this, true, freqProxWriter,
+ new TermsHash(this, false, termVectorsWriter, null));
+ final NormsWriter normsWriter = new NormsWriter();
+ final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
+ final StoredFieldsWriter fieldsWriter = new StoredFieldsWriter(this);
+ final DocFieldConsumers docFieldConsumers = new DocFieldConsumers(docInverter, fieldsWriter);
+ consumer = new DocFieldProcessor(this, docFieldConsumers);
}
/** If non-null, various details of indexing are printed
* here. */
- void setInfoStream(PrintStream infoStream) {
+ synchronized void setInfoStream(PrintStream infoStream) {
this.infoStream = infoStream;
+ for(int i=0;i<threadStates.length;i++)
+ threadStates[i].docState.infoStream = infoStream;
+ }
+
+ synchronized void setMaxFieldLength(int maxFieldLength) {
+ this.maxFieldLength = maxFieldLength;
+ for(int i=0;i<threadStates.length;i++)
+ threadStates[i].docState.maxFieldLength = maxFieldLength;
+ }
+
+ synchronized void setSimilarity(Similarity similarity) {
+ this.similarity = similarity;
+ for(int i=0;i<threadStates.length;i++)
+ threadStates[i].docState.similarity = similarity;
}
/** Set how much RAM we can use before flushing. */
- void setRAMBufferSizeMB(double mb) {
+ synchronized void setRAMBufferSizeMB(double mb) {
if (mb == IndexWriter.DISABLE_AUTO_FLUSH) {
ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;
+ waitQueuePauseBytes = 4*1024*1024;
+ waitQueueResumeBytes = 2*1024*1024;
} else {
ramBufferSize = (long) (mb*1024*1024);
+ waitQueuePauseBytes = (long) (ramBufferSize*0.1);
+ waitQueueResumeBytes = (long) (ramBufferSize*0.05);
+ freeTrigger = (long) (1.05 * ramBufferSize);
+ freeLevel = (long) (0.95 * ramBufferSize);
}
}
- double getRAMBufferSizeMB() {
+ synchronized double getRAMBufferSizeMB() {
if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) {
return ramBufferSize;
} else {
@@ -252,7 +330,7 @@
/** Returns the current doc store segment we are writing
* to. This will be the same as segment when autoCommit
* * is true. */
- String getDocStoreSegment() {
+ synchronized String getDocStoreSegment() {
return docStoreSegment;
}
@@ -265,51 +343,40 @@
/** Closes the current open doc stores an returns the doc
* store segment name. This returns null if there are *
* no buffered documents. */
- String closeDocStore() throws IOException {
-
+ synchronized String closeDocStore() throws IOException {
+
assert allThreadsIdle();
- List flushedFiles = files();
-
if (infoStream != null)
- message("closeDocStore: " + flushedFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+ message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+
+ boolean success = false;
- if (flushedFiles.size() > 0) {
- files = null;
+ try {
+ initFlushState(true);
+ closedFiles.clear();
- if (tvx != null) {
- // At least one doc in this run had term vectors enabled
- assert docStoreSegment != null;
- tvx.close();
- tvf.close();
- tvd.close();
- tvx = null;
- assert 4+numDocsInStore*16 == directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION):
- "after flush: tvx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION;
- }
-
- if (fieldsWriter != null) {
- assert docStoreSegment != null;
- fieldsWriter.close();
- fieldsWriter = null;
- assert 4+numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION):
- "after flush: fdx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION;
- }
+ consumer.closeDocStore(flushState);
+ assert 0 == openFiles.size();
String s = docStoreSegment;
docStoreSegment = null;
docStoreOffset = 0;
numDocsInStore = 0;
+ success = true;
return s;
- } else {
- return null;
+ } finally {
+ if (!success) {
+ abort();
+ }
}
}
- List files = null; // Cached list of files we've created
- private List abortedFiles = null; // List of files that were written before last abort()
+ private Collection abortedFiles; // List of files that were written before last abort()
- List abortedFiles() {
+ private FlushState flushState;
+
+ Collection abortedFiles() {
return abortedFiles;
}
@@ -317,186 +384,106 @@
writer.message("DW: " + message);
}
- /* Returns list of files in use by this instance,
- * including any flushed segments. */
- synchronized List files() {
-
- if (files != null)
- return files;
+ final List openFiles = new ArrayList();
+ final List closedFiles = new ArrayList();
- files = new ArrayList();
+ /* Returns Collection of files in use by this instance,
+ * including any flushed segments. */
+ synchronized List openFiles() {
+ return (List) ((ArrayList) openFiles).clone();
+ }
- // Stored fields:
- if (fieldsWriter != null) {
- assert docStoreSegment != null;
- files.add(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION);
- files.add(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);
- }
+ synchronized List closedFiles() {
+ return (List) ((ArrayList) closedFiles).clone();
+ }
- // Vectors:
- if (tvx != null) {
- assert docStoreSegment != null;
- files.add(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);
- files.add(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);
- files.add(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);
- }
+ synchronized void addOpenFile(String name) {
+ assert !openFiles.contains(name);
+ openFiles.add(name);
+ }
- return files;
+ synchronized void removeOpenFile(String name) {
+ assert openFiles.contains(name);
+ openFiles.remove(name);
+ closedFiles.add(name);
}
synchronized void setAborting() {
- abortCount++;
+ aborting = true;
}
/** Called if we hit an exception at a bad time (when
* updating the index files) and must discard all
* currently buffered docs. This resets our state,
- * discarding any docs added since last flush. If ae is
- * non-null, it contains the root cause exception (which
- * we re-throw after we are done aborting). */
- synchronized void abort(AbortException ae) throws IOException {
-
- // Anywhere that throws an AbortException must first
- // mark aborting to make sure while the exception is
- // unwinding the un-synchronized stack, no thread grabs
- // the corrupt ThreadState that hit the aborting
- // exception:
- assert ae == null || abortCount>0;
+ * discarding any docs added since last flush. */
+ synchronized void abort() throws IOException {
try {
-
- if (infoStream != null)
- message("docWriter: now abort");
+ message("docWriter: now abort");
// Forcefully remove waiting ThreadStates from line
- for(int i=0;i<numWaiting;i++)
- waitingThreadStates[i].isIdle = true;
- numWaiting = 0;
+ waitQueue.abort();
- // Wait for all other threads to finish with DocumentsWriter:
+ // Wait for all other threads to finish with
+ // DocumentsWriter:
pauseAllThreads();
- assert 0 == numWaiting;
-
try {
- deletesInRAM.clear();
+ assert 0 == waitQueue.numWaiting;
+
+ waitQueue.waitingBytes = 0;
try {
- abortedFiles = files();
+ abortedFiles = openFiles();
} catch (Throwable t) {
abortedFiles = null;
}
- docStoreSegment = null;
- numDocsInStore = 0;
- docStoreOffset = 0;
- files = null;
+ deletesInRAM.clear();
- // Clear vectors & fields from ThreadStates
- for(int i=0;i<threadStates.length;i++) {
- DocumentsWriterThreadState state = threadStates[i];
- state.tvfLocal.reset();
- state.fdtLocal.reset();
- if (state.localFieldsWriter != null) {
- try {
- state.localFieldsWriter.close();
- } catch (Throwable t) {
- }
- state.localFieldsWriter = null;
- }
- }
+ openFiles.clear();
- // Reset vectors writer
- if (tvx != null) {
+ for(int i=0;i<threadStates.length;i++)
try {
- tvx.close();
+ threadStates[i].consumer.abort();
} catch (Throwable t) {
}
- tvx = null;
- }
- if (tvd != null) {
- try {
- tvd.close();
- } catch (Throwable t) {
- }
- tvd = null;
- }
- if (tvf != null) {
- try {
- tvf.close();
- } catch (Throwable t) {
- }
- tvf = null;
- }
- // Reset fields writer
- if (fieldsWriter != null) {
- try {
- fieldsWriter.close();
- } catch (Throwable t) {
- }
- fieldsWriter = null;
+ try {
+ consumer.abort();
+ } catch (Throwable t) {
}
- // Discard pending norms:
- final int numField = fieldInfos.size();
- for (int i=0;i<numField;i++) {
- FieldInfo fi = fieldInfos.fieldInfo(i);
- if (fi.isIndexed && !fi.omitNorms) {
- BufferedNorms n = norms[i];
- if (n != null)
- try {
- n.reset();
- } catch (Throwable t) {
- }
- }
- }
+ docStoreSegment = null;
+ numDocsInStore = 0;
+ docStoreOffset = 0;
// Reset all postings data
- resetPostingsData();
+ doAfterFlush();
} finally {
resumeAllThreads();
}
-
- // If we have a root cause exception, re-throw it now:
- if (ae != null) {
- Throwable t = ae.getCause();
- if (t instanceof IOException)
- throw (IOException) t;
- else if (t instanceof RuntimeException)
- throw (RuntimeException) t;
- else if (t instanceof Error)
- throw (Error) t;
- else
- // Should not get here
- assert false: "unknown exception: " + t;
- }
} finally {
- if (ae != null)
- abortCount--;
+ aborting = false;
notifyAll();
}
}
/** Reset after a flush */
- private void resetPostingsData() throws IOException {
+ private void doAfterFlush() throws IOException {
// All ThreadStates should be idle when we are called
assert allThreadsIdle();
threadBindings.clear();
+ waitQueue.reset();
segment = null;
numDocsInRAM = 0;
nextDocID = 0;
- nextWriteDocID = 0;
- files = null;
- balanceRAM();
bufferIsFull = false;
flushPending = false;
- for(int i=0;i<threadStates.length;i++) {
- threadStates[i].numThreads = 0;
- threadStates[i].resetPostings();
- }
+ for(int i=0;i<threadStates.length;i++)
+ threadStates[i].doAfterFlush();
numBytesUsed = 0;
}
@@ -510,7 +497,8 @@
Thread.currentThread().interrupt();
}
}
- return abortCount > 0;
+
+ return aborting;
}
synchronized void resumeAllThreads() {
@@ -527,71 +515,98 @@
return true;
}
+ synchronized private void initFlushState(boolean onlyDocStore) {
+ initSegmentName(onlyDocStore);
+
+ if (flushState == null) {
+ flushState = new FlushState();
+ flushState.directory = directory;
+ flushState.docWriter = this;
+ }
+
+ flushState.docStoreSegmentName = docStoreSegment;
+ flushState.segmentName = segment;
+ flushState.numDocsInRAM = numDocsInRAM;
+ flushState.numDocsInStore = numDocsInStore;
+ flushState.flushedFiles = new HashSet();
+ }
+
/** Flush all pending docs to a new segment */
synchronized int flush(boolean closeDocStore) throws IOException {
assert allThreadsIdle();
- if (segment == null)
- // In case we are asked to flush an empty segment
- segment = writer.newSegmentName();
-
- newFiles = new ArrayList();
+ assert numDocsInRAM > 0;
- docStoreOffset = numDocsInStore;
+ assert nextDocID == numDocsInRAM;
+ assert waitQueue.numWaiting == 0;
+ assert waitQueue.waitingBytes == 0;
- int docCount;
+ initFlushState(false);
- assert numDocsInRAM > 0;
+ docStoreOffset = numDocsInStore;
if (infoStream != null)
- message("flush postings as segment " + segment + " numDocs=" + numDocsInRAM);
+ message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
boolean success = false;
try {
if (closeDocStore) {
- assert docStoreSegment != null;
- assert docStoreSegment.equals(segment);
- newFiles.addAll(files());
+ assert flushState.docStoreSegmentName != null;
+ assert flushState.docStoreSegmentName.equals(flushState.segmentName);
closeDocStore();
+ flushState.numDocsInStore = 0;
}
-
- fieldInfos.write(directory, segment + ".fnm");
- docCount = numDocsInRAM;
+ Collection threads = new HashSet();
+ for(int i=0;i<threadStates.length;i++)
+ threads.add(threadStates[i].consumer);
+ consumer.flush(threads, flushState);
- newFiles.addAll(writeSegment());
+ if (infoStream != null) {
+ final long newSegmentSize = segmentSize(flushState.segmentName);
+ String message = " oldRAMSize=" + numBytesUsed +
+ " newFlushedSize=" + newSegmentSize +
+ " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+ " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
+ message(message);
+ }
- flushedDocCount += docCount;
+ flushedDocCount += flushState.numDocsInRAM;
+
+ doAfterFlush();
success = true;
} finally {
- if (!success)
- abort(null);
+ if (!success) {
+ abort();
+ }
}
- return docCount;
+ assert waitQueue.waitingBytes == 0;
+
+ return flushState.numDocsInRAM;
}
/** Build compound file for the segment we just flushed */
- void createCompoundFile(String segment) throws IOException
- {
+ void createCompoundFile(String segment) throws IOException {
+
CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);
- final int size = newFiles.size();
- for(int i=0;i<size;i++)
- cfsWriter.addFile((String) newFiles.get(i));
+ Iterator it = flushState.flushedFiles.iterator();
+ while(it.hasNext())
+ cfsWriter.addFile((String) it.next());
// Perform the merge
cfsWriter.close();
}
/** Set flushPending if it is not already set and returns
- * whether it was set. This is used by IndexWriter to *
+ * whether it was set. This is used by IndexWriter to
* trigger a single flush even when multiple threads are
- * * trying to do so. */
+ * trying to do so. */
synchronized boolean setFlushPending() {
if (flushPending)
return false;
@@ -605,350 +620,26 @@
flushPending = false;
}
- private static final byte defaultNorm = Similarity.encodeNorm(1.0f);
-
- /** Write norms in the "true" segment format. This is
- * called only during commit, to create the .nrm file. */
- void writeNorms(String segmentName, int totalNumDoc) throws IOException {
-
- IndexOutput normsOut = directory.createOutput(segmentName + "." + IndexFileNames.NORMS_EXTENSION);
-
- try {
- normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length);
-
- final int numField = fieldInfos.size();
-
- for (int fieldIdx=0;fieldIdx<numField;fieldIdx++) {
- FieldInfo fi = fieldInfos.fieldInfo(fieldIdx);
- if (fi.isIndexed && !fi.omitNorms) {
- BufferedNorms n = norms[fieldIdx];
- final long v;
- if (n == null)
- v = 0;
- else {
- v = n.out.getFilePointer();
- n.out.writeTo(normsOut);
- n.reset();
- }
- if (v < totalNumDoc)
- fillBytes(normsOut, defaultNorm, (int) (totalNumDoc-v));
- }
- }
- } finally {
- normsOut.close();
- }
- }
-
- private DefaultSkipListWriter skipListWriter = null;
-
- private boolean currentFieldStorePayloads;
-
- /** Creates a segment from all Postings in the Postings
- * hashes across all ThreadStates & FieldDatas. */
- private List writeSegment() throws IOException {
-
- assert allThreadsIdle();
-
- assert nextDocID == numDocsInRAM;
-
- final String segmentName;
-
- segmentName = segment;
-
- TermInfosWriter termsOut = new TermInfosWriter(directory, segmentName, fieldInfos,
- writer.getTermIndexInterval());
-
- IndexOutput freqOut = directory.createOutput(segmentName + ".frq");
- IndexOutput proxOut = directory.createOutput(segmentName + ".prx");
-
- // Gather all FieldData's that have postings, across all
- // ThreadStates
- ArrayList allFields = new ArrayList();
- assert allThreadsIdle();
- for(int i=0;i<threadStates.length;i++) {
- DocumentsWriterThreadState state = threadStates[i];
- state.trimFields();
- final int numFields = state.numAllFieldData;
- for(int j=0;j<numFields;j++) {
- DocumentsWriterFieldData fp = state.allFieldDataArray[j];
- if (fp.numPostings > 0)
- allFields.add(fp);
- }
- }
-
- // Sort by field name
- Collections.sort(allFields);
- final int numAllFields = allFields.size();
-
- skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval,
- termsOut.maxSkipLevels,
- numDocsInRAM, freqOut, proxOut);
-
- int start = 0;
- while(start < numAllFields) {
-
- final String fieldName = ((DocumentsWriterFieldData) allFields.get(start)).fieldInfo.name;
-
- int end = start+1;
- while(end < numAllFields && ((DocumentsWriterFieldData) allFields.get(end)).fieldInfo.name.equals(fieldName))
- end++;
-
- DocumentsWriterFieldData[] fields = new DocumentsWriterFieldData[end-start];
- for(int i=start;i<end;i++)
- fields[i-start] = (DocumentsWriterFieldData) allFields.get(i);
-
- // If this field has postings then add them to the
- // segment
- appendPostings(fields, termsOut, freqOut, proxOut);
-
- for(int i=0;i<fields.length;i++)
- fields[i].resetPostingArrays();
-
- start = end;
- }
-
- freqOut.close();
- proxOut.close();
- termsOut.close();
-
- // Record all files we have flushed
- List flushedFiles = new ArrayList();
- flushedFiles.add(segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION));
- flushedFiles.add(segmentFileName(IndexFileNames.FREQ_EXTENSION));
- flushedFiles.add(segmentFileName(IndexFileNames.PROX_EXTENSION));
- flushedFiles.add(segmentFileName(IndexFileNames.TERMS_EXTENSION));
- flushedFiles.add(segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));
-
- if (hasNorms) {
- writeNorms(segmentName, numDocsInRAM);
- flushedFiles.add(segmentFileName(IndexFileNames.NORMS_EXTENSION));
- }
-
- if (infoStream != null) {
- final long newSegmentSize = segmentSize(segmentName);
- String message = " oldRAMSize=" + numBytesUsed + " newFlushedSize=" + newSegmentSize + " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) + " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
- message(message);
- }
-
- resetPostingsData();
-
- // Maybe downsize postingsFreeList array
- if (postingsFreeList.length > 1.5*postingsFreeCount) {
- int newSize = postingsFreeList.length;
- while(newSize > 1.25*postingsFreeCount) {
- newSize = (int) (newSize*0.8);
- }
- Posting[] newArray = new Posting[newSize];
- System.arraycopy(postingsFreeList, 0, newArray, 0, postingsFreeCount);
- postingsFreeList = newArray;
- }
-
- return flushedFiles;
- }
-
synchronized void pushDeletes() {
deletesFlushed.update(deletesInRAM);
}
- /** Returns the name of the file with this extension, on
- * the current segment we are working on. */
- private String segmentFileName(String extension) {
- return segment + "." + extension;
- }
-
- private static int compareText(final char[] text1, int pos1, final char[] text2, int pos2) {
- while(true) {
- final char c1 = text1[pos1++];
- final char c2 = text2[pos2++];
- if (c1 != c2) {
- if (0xffff == c2)
- return 1;
- else if (0xffff == c1)
- return -1;
- else
- return c1-c2;
- } else if (0xffff == c1)
- return 0;
- }
+ synchronized void close() {
+ closed = true;
+ notifyAll();
}
- private final TermInfo termInfo = new TermInfo(); // minimize consing
-
- final UnicodeUtil.UTF8Result termsUTF8 = new UnicodeUtil.UTF8Result();
-
- /* Walk through all unique text tokens (Posting
- * instances) found in this field and serialize them
- * into a single RAM segment. */
- void appendPostings(DocumentsWriterFieldData[] fields,
- TermInfosWriter termsOut,
- IndexOutput freqOut,
- IndexOutput proxOut)
- throws CorruptIndexException, IOException {
-
- final int fieldNumber = fields[0].fieldInfo.number;
- int numFields = fields.length;
-
- final DocumentsWriterFieldMergeState[] mergeStates = new DocumentsWriterFieldMergeState[numFields];
-
- for(int i=0;i<numFields;i++) {
- DocumentsWriterFieldMergeState fms = mergeStates[i] = new DocumentsWriterFieldMergeState();
- fms.field = fields[i];
- fms.postings = fms.field.sortPostings();
-
- assert fms.field.fieldInfo == fields[0].fieldInfo;
-
- // Should always be true
- boolean result = fms.nextTerm();
- assert result;
+ synchronized void initSegmentName(boolean onlyDocStore) {
+ if (segment == null && (!onlyDocStore || docStoreSegment == null)) {
+ segment = writer.newSegmentName();
+ assert numDocsInRAM == 0;
}
-
- final int skipInterval = termsOut.skipInterval;
- currentFieldStorePayloads = fields[0].fieldInfo.storePayloads;
-
- DocumentsWriterFieldMergeState[] termStates = new DocumentsWriterFieldMergeState[numFields];
-
- while(numFields > 0) {
-
- // Get the next term to merge
- termStates[0] = mergeStates[0];
- int numToMerge = 1;
-
- for(int i=1;i<numFields;i++) {
- final char[] text = mergeStates[i].text;
- final int textOffset = mergeStates[i].textOffset;
- final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset);
-
- if (cmp < 0) {
- termStates[0] = mergeStates[i];
- numToMerge = 1;
- } else if (cmp == 0)
- termStates[numToMerge++] = mergeStates[i];
- }
-
- int df = 0;
- int lastPayloadLength = -1;
-
- int lastDoc = 0;
-
- final char[] text = termStates[0].text;
- final int start = termStates[0].textOffset;
-
- long freqPointer = freqOut.getFilePointer();
- long proxPointer = proxOut.getFilePointer();
-
- skipListWriter.resetSkip();
-
- // Now termStates has numToMerge FieldMergeStates
- // which all share the same term. Now we must
- // interleave the docID streams.
- while(numToMerge > 0) {
-
- if ((++df % skipInterval) == 0) {
- skipListWriter.setSkipData(lastDoc, currentFieldStorePayloads, lastPayloadLength);
- skipListWriter.bufferSkip(df);
- }
-
- DocumentsWriterFieldMergeState minState = termStates[0];
- for(int i=1;i<numToMerge;i++)
- if (termStates[i].docID < minState.docID)
- minState = termStates[i];
-
- final int doc = minState.docID;
- final int termDocFreq = minState.termFreq;
-
- assert doc < numDocsInRAM;
- assert doc > lastDoc || df == 1;
-
- final int newDocCode = (doc-lastDoc)<<1;
- lastDoc = doc;
-
- final ByteSliceReader prox = minState.prox;
-
- // Carefully copy over the prox + payload info,
- // changing the format to match Lucene's segment
- // format.
- for(int j=0;j<termDocFreq;j++) {
- final int code = prox.readVInt();
- if (currentFieldStorePayloads) {
- final int payloadLength;
- if ((code & 1) != 0) {
- // This position has a payload
- payloadLength = prox.readVInt();
- } else
- payloadLength = 0;
- if (payloadLength != lastPayloadLength) {
- proxOut.writeVInt(code|1);
- proxOut.writeVInt(payloadLength);
- lastPayloadLength = payloadLength;
- } else
- proxOut.writeVInt(code & (~1));
- if (payloadLength > 0)
- copyBytes(prox, proxOut, payloadLength);
- } else {
- assert 0 == (code & 1);
- proxOut.writeVInt(code>>1);
- }
- }
-
- if (1 == termDocFreq) {
- freqOut.writeVInt(newDocCode|1);
- } else {
- freqOut.writeVInt(newDocCode);
- freqOut.writeVInt(termDocFreq);
- }
-
- if (!minState.nextDoc()) {
-
- // Remove from termStates
- int upto = 0;
- for(int i=0;i<numToMerge;i++)
- if (termStates[i] != minState)
- termStates[upto++] = termStates[i];
- numToMerge--;
- assert upto == numToMerge;
-
- // Advance this state to the next term
-
- if (!minState.nextTerm()) {
- // OK, no more terms, so remove from mergeStates
- // as well
- upto = 0;
- for(int i=0;i<numFields;i++)
- if (mergeStates[i] != minState)
- mergeStates[upto++] = mergeStates[i];
- numFields--;
- assert upto == numFields;
- }
- }
- }
-
- assert df > 0;
-
- // Done merging this term
-
- long skipPointer = skipListWriter.writeSkip(freqOut);
-
- // Write term
- termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer));
-
- // TODO: we could do this incrementally
- UnicodeUtil.UTF16toUTF8(text, start, termsUTF8);
-
- // TODO: we could save O(n) re-scan of the term by
- // computing the shared prefix with the last term
- // while during the UTF8 encoding
- termsOut.add(fieldNumber,
- termsUTF8.result,
- termsUTF8.length,
- termInfo);
+ if (docStoreSegment == null) {
+ docStoreSegment = segment;
+ assert numDocsInStore == 0;
}
}
- synchronized void close() {
- closed = true;
- notifyAll();
- }
-
/** Returns a free (idle) ThreadState that may be used for
* indexing this one document. This call also pauses if a
* flush is pending. If delTerm is non-null then we
@@ -961,14 +652,16 @@
// again.
DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread());
if (state == null) {
- // First time this thread has called us since last flush
+
+ // First time this thread has called us since last
+ // flush. Find the least loaded thread state:
DocumentsWriterThreadState minThreadState = null;
for(int i=0;i<threadStates.length;i++) {
DocumentsWriterThreadState ts = threadStates[i];
if (minThreadState == null || ts.numThreads < minThreadState.numThreads)
minThreadState = ts;
}
- if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length == MAX_THREAD_STATE)) {
+ if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= MAX_THREAD_STATE)) {
state = minThreadState;
state.numThreads++;
} else {
@@ -987,46 +680,49 @@
// not be paused nor a flush pending:
waitReady(state);
- if (segment == null)
- segment = writer.newSegmentName();
+ // Allocate segment name if this is the first doc since
+ // last flush:
+ initSegmentName(false);
state.isIdle = false;
+ boolean success = false;
try {
- boolean success = false;
- try {
- state.init(doc, nextDocID);
- if (delTerm != null) {
- addDeleteTerm(delTerm, state.docID);
- state.doFlushAfter = timeToFlushDeletes();
- }
- // Only increment nextDocID & numDocsInRAM on successful init
- nextDocID++;
- numDocsInRAM++;
-
- // We must at this point commit to flushing to ensure we
- // always get N docs when we flush by doc count, even if
- // > 1 thread is adding documents:
- if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH
- && numDocsInRAM >= maxBufferedDocs) {
- flushPending = true;
- state.doFlushAfter = true;
- }
+ state.docState.docID = nextDocID;
- success = true;
- } finally {
- if (!success) {
- // Forcefully idle this ThreadState:
- state.isIdle = true;
- notifyAll();
- if (state.doFlushAfter) {
- state.doFlushAfter = false;
- flushPending = false;
- }
+ assert writer.testPoint("DocumentsWriter.ThreadState.init start");
+
+ if (delTerm != null) {
+ addDeleteTerm(delTerm, state.docState.docID);
+ state.doFlushAfter = timeToFlushDeletes();
+ }
+
+ assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm");
+
+ nextDocID++;
+ numDocsInRAM++;
+
+ // We must at this point commit to flushing to ensure we
+ // always get N docs when we flush by doc count, even if
+ // > 1 thread is adding documents:
+ if (!flushPending &&
+ maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH
+ && numDocsInRAM >= maxBufferedDocs) {
+ flushPending = true;
+ state.doFlushAfter = true;
+ }
+
+ success = true;
+ } finally {
+ if (!success) {
+ // Forcefully idle this ThreadState:
+ state.isIdle = true;
+ notifyAll();
+ if (state.doFlushAfter) {
+ state.doFlushAfter = false;
+ flushPending = false;
}
}
- } catch (AbortException ae) {
- abort(ae);
}
return state;
@@ -1049,25 +745,45 @@
// This call is synchronized but fast
final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
+
+ final DocState docState = state.docState;
+ docState.doc = doc;
+ docState.analyzer = analyzer;
+
+ boolean success = false;
try {
- boolean success = false;
- try {
- try {
- // This call is not synchronized and does all the work
- state.processDocument(analyzer);
- } finally {
- // Note that we must call finishDocument even on
- // exception, because for a non-aborting
- // exception, a portion of the document has been
- // indexed (and its ID is marked for deletion), so
- // all index files must be updated to record this
- // document. This call is synchronized but fast.
- finishDocument(state);
- }
- success = true;
- } finally {
- if (!success) {
- synchronized(this) {
+ // This call is not synchronized and does all the
+ // work
+ final DocWriter perDoc = state.consumer.processDocument();
+
+ // This call is synchronized but fast
+ finishDocument(state, perDoc);
+ success = true;
+ } finally {
+ if (!success) {
+ synchronized(this) {
+
+ if (aborting) {
+ state.isIdle = true;
+ notifyAll();
+ abort();
+ } else {
+ skipDocWriter.docID = docState.docID;
+ boolean success2 = false;
+ try {
+ waitQueue.add(skipDocWriter);
+ success2 = true;
+ } finally {
+ if (!success2) {
+ state.isIdle = true;
+ notifyAll();
+ abort();
+ return false;
+ }
+ }
+
+ state.isIdle = true;
+ notifyAll();
// If this thread state had decided to flush, we
// must clear it so another thread can flush
@@ -1081,12 +797,10 @@
// since likely it was partially added. This
// keeps indexing as "all or none" (atomic) when
// adding a document:
- addDeleteDocID(state.docID);
+ addDeleteDocID(state.docState.docID);
}
}
}
- } catch (AbortException ae) {
- abort(ae);
}
return state.doFlushAfter || timeToFlushDeletes();
@@ -1114,12 +828,14 @@
}
synchronized private void waitReady(DocumentsWriterThreadState state) {
- while(!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || abortCount > 0))
+
+ while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+ }
if (closed)
throw new AlreadyClosedException("this IndexWriter is closed");
@@ -1297,66 +1013,83 @@
deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID));
}
+ synchronized boolean doBalanceRAM() {
+ return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger);
+ }
+
/** Does the synchronized work to finish/flush the
- * inverted document. */
- private synchronized void finishDocument(DocumentsWriterThreadState state) throws IOException, AbortException {
- if (abortCount > 0) {
- // Forcefully idle this threadstate -- its state will
- // be reset by abort()
- state.isIdle = true;
- notifyAll();
- return;
- }
+ * inverted document. */
+ private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
- if (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH
- && numBytesUsed >= ramBufferSize)
+ if (doBalanceRAM())
+ // Must call this w/o holding synchronized(this) else
+ // we'll hit deadlock:
balanceRAM();
- // Now write the indexed document to the real files.
- if (nextWriteDocID == state.docID) {
- // It's my turn, so write everything now:
- nextWriteDocID++;
- state.writeDocument();
- state.isIdle = true;
- notifyAll();
+ synchronized(this) {
- // If any states were waiting on me, sweep through and
- // flush those that are enabled by my write.
- if (numWaiting > 0) {
- boolean any = true;
- while(any) {
- any = false;
- for(int i=0;i<numWaiting;) {
- final DocumentsWriterThreadState s = waitingThreadStates[i];
- if (s.docID == nextWriteDocID) {
- s.writeDocument();
- s.isIdle = true;
- nextWriteDocID++;
- any = true;
- if (numWaiting > i+1)
- // Swap in the last waiting state to fill in
- // the hole we just created. It's important
- // to do this as-we-go and not at the end of
- // the loop, because if we hit an aborting
- // exception in one of the s.writeDocument
- // calls (above), it leaves this array in an
- // inconsistent state:
- waitingThreadStates[i] = waitingThreadStates[numWaiting-1];
- numWaiting--;
- } else {
- assert !s.isIdle;
- i++;
- }
+ assert docWriter == null || docWriter.docID == perThread.docState.docID;
+
+
+ if (aborting) {
+
+ // We are currently aborting, and another thread is
+ // waiting for me to become idle. We just forcefully
+ // idle this threadState; it will be fully reset by
+ // abort()
+ if (docWriter != null)
+ try {
+ docWriter.abort();
+ } catch (Throwable t) {
}
- }
+
+ perThread.isIdle = true;
+ notifyAll();
+ return;
}
- } else {
- // Another thread got a docID before me, but, it
- // hasn't finished its processing. So add myself to
- // the line but don't hold up this thread.
- waitingThreadStates[numWaiting++] = state;
+
+ final boolean doPause;
+
+ if (docWriter != null)
+ doPause = waitQueue.add(docWriter);
+ else {
+ skipDocWriter.docID = perThread.docState.docID;
+ doPause = waitQueue.add(skipDocWriter);
+ }
+
+ if (doPause)
+ waitForWaitQueue();
+
+ if (bufferIsFull && !flushPending) {
+ flushPending = true;
+ perThread.doFlushAfter = true;
+ }
+
+ perThread.isIdle = true;
+ notifyAll();
+ }
+ }
+
+ synchronized void waitForWaitQueue() {
+ do {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } while (!waitQueue.doResume());
+ }
+
+ private static class SkipDocWriter extends DocWriter {
+ void finish() {
+ }
+ void abort() {
+ }
+ long sizeInBytes() {
+ return 0;
}
}
+ final SkipDocWriter skipDocWriter = new SkipDocWriter();
long getRAMUsed() {
return numBytesUsed;
@@ -1367,35 +1100,10 @@
NumberFormat nf = NumberFormat.getInstance();
- /* Used only when writing norms to fill in default norm
- * value into the holes in docID stream for those docs
- * that didn't have this field. */
- static void fillBytes(IndexOutput out, byte b, int numBytes) throws IOException {
- for(int i=0;i<numBytes;i++)
- out.writeByte(b);
- }
-
- final byte[] copyByteBuffer = new byte[4096];
-
- /** Copy numBytes from srcIn to destIn */
- void copyBytes(IndexInput srcIn, IndexOutput destIn, long numBytes) throws IOException {
- // TODO: we could do this more efficiently (save a copy)
- // because it's always from a ByteSliceReader ->
- // IndexOutput
- while(numBytes > 0) {
- final int chunk;
- if (numBytes > 4096)
- chunk = 4096;
- else
- chunk = (int) numBytes;
- srcIn.readBytes(copyByteBuffer, 0, chunk);
- destIn.writeBytes(copyByteBuffer, 0, chunk);
- numBytes -= chunk;
- }
- }
-
- // Used only when infoStream != null
+ // TODO FI: this is not flexible -- we can't hardwire
+ // extensions in here:
private long segmentSize(String segmentName) throws IOException {
+ // Used only when infoStream != null
assert infoStream != null;
long size = directory.fileLength(segmentName + ".tii") +
@@ -1410,66 +1118,16 @@
return size;
}
- final private static int POINTER_NUM_BYTE = 4;
- final private static int INT_NUM_BYTE = 4;
- final private static int CHAR_NUM_BYTE = 2;
-
- // Why + 5*POINTER_NUM_BYTE below?
- // 1: Posting has "vector" field which is a pointer
- // 2: Posting is referenced by postingsFreeList array
- // 3,4,5: Posting is referenced by postings hash, which
- // targets 25-50% fill factor; approximate this
- // as 3X # pointers
- final static int POSTING_NUM_BYTE = OBJECT_HEADER_BYTES + 9*INT_NUM_BYTE + 5*POINTER_NUM_BYTE;
-
- // Holds free pool of Posting instances
- private Posting[] postingsFreeList;
- private int postingsFreeCount;
- private int postingsAllocCount;
-
- /* Allocate more Postings from shared pool */
- synchronized void getPostings(Posting[] postings) {
- numBytesUsed += postings.length * POSTING_NUM_BYTE;
- final int numToCopy;
- if (postingsFreeCount < postings.length)
- numToCopy = postingsFreeCount;
- else
- numToCopy = postings.length;
- final int start = postingsFreeCount-numToCopy;
- System.arraycopy(postingsFreeList, start,
- postings, 0, numToCopy);
- postingsFreeCount -= numToCopy;
-
- // Directly allocate the remainder if any
- if (numToCopy < postings.length) {
- final int extra = postings.length - numToCopy;
- final int newPostingsAllocCount = postingsAllocCount + extra;
- if (newPostingsAllocCount > postingsFreeList.length)
- postingsFreeList = new Posting[(int) (1.25 * newPostingsAllocCount)];
-
- balanceRAM();
- for(int i=numToCopy;i<postings.length;i++) {
- postings[i] = new Posting();
- numBytesAlloc += POSTING_NUM_BYTE;
- postingsAllocCount++;
- }
- }
- assert numBytesUsed <= numBytesAlloc;
- }
-
- synchronized void recyclePostings(Posting[] postings, int numPostings) {
- // Move all Postings from this ThreadState back to our
- // free list. We pre-allocated this array while we were
- // creating Postings to make sure it's large enough
- assert postingsFreeCount + numPostings <= postingsFreeList.length;
- System.arraycopy(postings, 0, postingsFreeList, postingsFreeCount, numPostings);
- postingsFreeCount += numPostings;
- }
+ // Coarse estimates used to measure RAM usage of buffered deletes
+ final static int OBJECT_HEADER_BYTES = 8;
+ final static int POINTER_NUM_BYTE = 4;
+ final static int INT_NUM_BYTE = 4;
+ final static int CHAR_NUM_BYTE = 2;
/* Initial chunks size of the shared byte[] blocks used to
store postings data */
final static int BYTE_BLOCK_SHIFT = 15;
- final static int BYTE_BLOCK_SIZE = (int) Math.pow(2.0, BYTE_BLOCK_SHIFT);
+ final static int BYTE_BLOCK_SIZE = (int) (1 << BYTE_BLOCK_SHIFT);
final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
@@ -1483,8 +1141,13 @@
final int size = freeByteBlocks.size();
final byte[] b;
if (0 == size) {
+ // Always record a block allocated, even if
+ // trackAllocations is false. This is necessary
+ // because this block will be shared between
+ // things that don't track allocations (term
+ // vectors) and things that do (freq/prox
+ // postings).
numBytesAlloc += BYTE_BLOCK_SIZE;
- balanceRAM();
b = new byte[BYTE_BLOCK_SIZE];
} else
b = (byte[]) freeByteBlocks.remove(size-1);
@@ -1504,12 +1167,57 @@
}
}
+ /* Initial chunks size of the shared int[] blocks used to
+ store postings data */
+ final static int INT_BLOCK_SHIFT = 13;
+ final static int INT_BLOCK_SIZE = (int) (1 << INT_BLOCK_SHIFT);
+ final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+ private ArrayList freeIntBlocks = new ArrayList();
+
+ /* Allocate another int[] from the shared pool */
+ synchronized int[] getIntBlock(boolean trackAllocations) {
+ final int size = freeIntBlocks.size();
+ final int[] b;
+ if (0 == size) {
+ // Always record a block allocated, even if
+ // trackAllocations is false. This is necessary
+ // because this block will be shared between
+ // things that don't track allocations (term
+ // vectors) and things that do (freq/prox
+ // postings).
+ numBytesAlloc += INT_BLOCK_SIZE*INT_NUM_BYTE;
+ b = new int[INT_BLOCK_SIZE];
+ } else
+ b = (int[]) freeIntBlocks.remove(size-1);
+ if (trackAllocations)
+ numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
+ assert numBytesUsed <= numBytesAlloc;
+ return b;
+ }
+
+ synchronized void bytesAllocated(long numBytes) {
+ numBytesAlloc += numBytes;
+ assert numBytesUsed <= numBytesAlloc;
+ }
+
+ synchronized void bytesUsed(long numBytes) {
+ numBytesUsed += numBytes;
+ assert numBytesUsed <= numBytesAlloc;
+ }
+
+ /* Return int[]s to the pool */
+ synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
+ for(int i=start;i<end;i++)
+ freeIntBlocks.add(blocks[i]);
+ }
+
ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator();
/* Initial chunk size of the shared char[] blocks used to
store term text */
final static int CHAR_BLOCK_SHIFT = 14;
- final static int CHAR_BLOCK_SIZE = (int) Math.pow(2.0, CHAR_BLOCK_SHIFT);
+ final static int CHAR_BLOCK_SIZE = (int) (1 << CHAR_BLOCK_SHIFT);
final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;
final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;
@@ -1522,16 +1230,19 @@
final char[] c;
if (0 == size) {
numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
- balanceRAM();
c = new char[CHAR_BLOCK_SIZE];
} else
c = (char[]) freeCharBlocks.remove(size-1);
+ // We always track allocations of char blocks, for now,
+ // because nothing that skips allocation tracking
+ // (currently only term vectors) uses its own char
+ // blocks.
numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
assert numBytesUsed <= numBytesAlloc;
return c;
}
- /* Return a char[] to the pool */
+ /* Return char[]s to the pool */
synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
for(int i=0;i<numBlocks;i++)
freeCharBlocks.add(blocks[i]);
@@ -1552,76 +1263,70 @@
* the other two. This method just frees allocations from
* the pools once we are over-budget, which balances the
* pools to match the current docs. */
- synchronized void balanceRAM() {
-
- if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH || bufferIsFull)
- return;
+ void balanceRAM() {
- // We free our allocations if we've allocated 5% over
- // our allowed RAM buffer
- final long freeTrigger = (long) (1.05 * ramBufferSize);
- final long freeLevel = (long) (0.95 * ramBufferSize);
-
// We flush when we've used our target usage
final long flushTrigger = (long) ramBufferSize;
if (numBytesAlloc > freeTrigger) {
+
if (infoStream != null)
message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +
" vs trigger=" + toMB(flushTrigger) +
" allocMB=" + toMB(numBytesAlloc) +
" vs trigger=" + toMB(freeTrigger) +
- " postingsFree=" + toMB(postingsFreeCount*POSTING_NUM_BYTE) +
" byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
" charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
- // When we've crossed 100% of our target Postings
- // RAM usage, try to free up until we're back down
- // to 95%
final long startBytesAlloc = numBytesAlloc;
- final int postingsFreeChunk = (int) (BYTE_BLOCK_SIZE / POSTING_NUM_BYTE);
-
int iter = 0;
- // We free equally from each pool in 64 KB
+ // We free equally from each pool in 32 KB
// chunks until we are below our threshold
// (freeLevel)
+ boolean any = true;
+
while(numBytesAlloc > freeLevel) {
- if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == postingsFreeCount) {
- // Nothing else to free -- must flush now.
- bufferIsFull = true;
- if (infoStream != null)
- message(" nothing to free; now set bufferIsFull");
- break;
- }
+
+ synchronized(this) {
+ if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) {
+ // Nothing else to free -- must flush now.
+ bufferIsFull = numBytesUsed > flushTrigger;
+ if (infoStream != null) {
+ if (numBytesUsed > flushTrigger)
+ message(" nothing to free; now set bufferIsFull");
+ else
+ message(" nothing to free");
+ }
+ assert numBytesUsed <= numBytesAlloc;
+ break;
+ }
- if ((0 == iter % 3) && byteBlockAllocator.freeByteBlocks.size() > 0) {
- byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
- numBytesAlloc -= BYTE_BLOCK_SIZE;
- }
+ if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) {
+ byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
+ numBytesAlloc -= BYTE_BLOCK_SIZE;
+ }
- if ((1 == iter % 3) && freeCharBlocks.size() > 0) {
- freeCharBlocks.remove(freeCharBlocks.size()-1);
- numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
- }
+ if ((1 == iter % 4) && freeCharBlocks.size() > 0) {
+ freeCharBlocks.remove(freeCharBlocks.size()-1);
+ numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
+ }
- if ((2 == iter % 3) && postingsFreeCount > 0) {
- final int numToFree;
- if (postingsFreeCount >= postingsFreeChunk)
- numToFree = postingsFreeChunk;
- else
- numToFree = postingsFreeCount;
- Arrays.fill(postingsFreeList, postingsFreeCount-numToFree, postingsFreeCount, null);
- postingsFreeCount -= numToFree;
- postingsAllocCount -= numToFree;
- numBytesAlloc -= numToFree * POSTING_NUM_BYTE;
+ if ((2 == iter % 4) && freeIntBlocks.size() > 0) {
+ freeIntBlocks.remove(freeIntBlocks.size()-1);
+ numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
+ }
}
+ if ((3 == iter % 4) && any)
+ // Ask consumer to free any recycled state
+ any = consumer.freeRAM();
+
iter++;
}
-
+
if (infoStream != null)
message(" after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc)/1024./1024.) + " usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.));
@@ -1631,14 +1336,132 @@
// using, go ahead and flush. This prevents
// over-allocating and then freeing, with every
// flush.
- if (numBytesUsed > flushTrigger) {
- if (infoStream != null)
- message(" RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +
- " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +
- " triggerMB=" + nf.format(flushTrigger/1024./1024.));
+ synchronized(this) {
- bufferIsFull = true;
+ if (numBytesUsed > flushTrigger) {
+ if (infoStream != null)
+ message(" RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +
+ " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +
+ " triggerMB=" + nf.format(flushTrigger/1024./1024.));
+
+ bufferIsFull = true;
+ }
}
}
}
+
+ final WaitQueue waitQueue = new WaitQueue();
+
+ private class WaitQueue {
+ DocWriter[] waiting;
+ int nextWriteDocID;
+ int nextWriteLoc;
+ int numWaiting;
+ long waitingBytes;
+
+ public WaitQueue() {
+ waiting = new DocWriter[10];
+ }
+
+ synchronized void reset() {
+ // NOTE: nextWriteLoc doesn't need to be reset
+ assert numWaiting == 0;
+ assert waitingBytes == 0;
+ nextWriteDocID = 0;
+ }
+
+ synchronized boolean doResume() {
+ return waitingBytes <= waitQueueResumeBytes;
+ }
+
+ synchronized boolean doPause() {
+ return waitingBytes > waitQueuePauseBytes;
+ }
+
+ synchronized void abort() {
+ int count = 0;
+ for(int i=0;i<waiting.length;i++) {
+ final DocWriter doc = waiting[i];
+ if (doc != null) {
+ doc.abort();
+ waiting[i] = null;
+ count++;
+ }
+ }
+ waitingBytes = 0;
+ assert count == numWaiting;
+ numWaiting = 0;
+ }
+
+ private void writeDocument(DocWriter doc) throws IOException {
+ assert doc == skipDocWriter || nextWriteDocID == doc.docID;
+ boolean success = false;
+ try {
+ doc.finish();
+ nextWriteDocID++;
+ numDocsInStore++;
+ nextWriteLoc++;
+ assert nextWriteLoc <= waiting.length;
+ if (nextWriteLoc == waiting.length)
+ nextWriteLoc = 0;
+ success = true;
+ } finally {
+ if (!success)
+ setAborting();
+ }
+ }
+
+ synchronized public boolean add(DocWriter doc) throws IOException {
+
+ assert doc.docID >= nextWriteDocID;
+
+ if (doc.docID == nextWriteDocID) {
+ writeDocument(doc);
+ while(true) {
+ doc = waiting[nextWriteLoc];
+ if (doc != null) {
+ numWaiting--;
+ waiting[nextWriteLoc] = null;
+ waitingBytes -= doc.sizeInBytes();
+ writeDocument(doc);
+ } else
+ break;
+ }
+ } else {
+
+ // I finished before documents that were added
+ // before me. This can easily happen when I am a
+ // small doc and the docs before me were large, or,
+ // just due to luck in the thread scheduling. Just
+ // add myself to the queue and when that large doc
+ // finishes, it will flush me:
+ int gap = doc.docID - nextWriteDocID;
+ if (gap >= waiting.length) {
+ // Grow queue
+ DocWriter[] newArray = new DocWriter[ArrayUtil.getNextSize(gap)];
+ assert nextWriteLoc >= 0;
+ System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
+ System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
+ nextWriteLoc = 0;
+ waiting = newArray;
+ gap = doc.docID - nextWriteDocID;
+ }
+
+ int loc = nextWriteLoc + gap;
+ if (loc >= waiting.length)
+ loc -= waiting.length;
+
+ // We should only wrap one time
+ assert loc < waiting.length;
+
+ // Nobody should be in my spot!
+ assert waiting[loc] == null;
+ waiting[loc] = doc;
+ numWaiting++;
+ waitingBytes += doc.sizeInBytes();
+ }
+
+ return doPause();
+ }
+ }
}