You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC
svn commit: r1098427 [1/5] - in /lucene/dev/trunk: ./ lucene/
lucene/src/java/org/apache/lucene/index/
lucene/src/test-framework/org/apache/lucene/search/
lucene/src/test-framework/org/apache/lucene/store/
lucene/src/test-framework/org/apache/lucene/ut...
Author: uschindler
Date: Sun May 1 22:38:33 2011
New Revision: 1098427
URL: http://svn.apache.org/viewvc?rev=1098427&view=rev
Log:
LUCENE-3023: Land DWPT on trunk
Added:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java
- copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java
Removed:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
Modified:
lucene/dev/trunk/ (props changed)
lucene/dev/trunk/lucene/ (props changed)
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHash.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/search/QueryUtils.java
lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/util/LuceneTestCase.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestLazyBug.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
lucene/dev/trunk/solr/ (props changed)
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Sun May 1 22:38:33 2011
@@ -173,6 +173,70 @@ Changes in Runtime Behavior
globally across IndexWriter sessions and persisted into a X.fnx file on
successful commit. The corresponding file format changes are backwards-
compatible. (Michael Busch, Simon Willnauer)
+
+* LUCENE-2956, LUCENE-2573, LUCENE-2324, LUCENE-2555: Changes from
+ DocumentsWriterPerThread:
+
+ - IndexWriter now uses a DocumentsWriter per thread when indexing documents.
+ Each DocumentsWriterPerThread indexes documents in its own private segment,
+ and the in memory segments are no longer merged on flush. Instead, each
+ segment is separately flushed to disk and subsequently merged with normal
+ segment merging.
+
+ - DocumentsWriterPerThread (DWPT) is now flushed concurrently based on a
+ FlushPolicy. When a DWPT is flushed, a fresh DWPT is swapped in so that
+ indexing may continue concurrently with flushing. The selected
+ DWPT flushes all its RAM resident documents do disk. Note: Segment flushes
+ don't flush all RAM resident documents but only the documents private to
+ the DWPT selected for flushing.
+
+ - Flushing is now controlled by FlushPolicy that is called for every add,
+ update or delete on IndexWriter. By default DWPTs are flushed either on
+ maxBufferedDocs per DWPT or the global active used memory. Once the active
+ memory exceeds ramBufferSizeMB only the largest DWPT is selected for
+ flushing and the memory used by this DWPT is substracted from the active
+ memory and added to a flushing memory pool, which can lead to temporarily
+ higher memory usage due to ongoing indexing.
+
+ - IndexWriter now can utilize ramBufferSize > 2048 MB. Each DWPT can address
+ up to 2048 MB memory such that the ramBufferSize is now bounded by the max
+ number of DWPT avaliable in the used DocumentsWriterPerThreadPool.
+ IndexWriters net memory consumption can grow far beyond the 2048 MB limit if
+ the applicatoin can use all available DWPTs. To prevent a DWPT from
+ exhausting its address space IndexWriter will forcefully flush a DWPT if its
+ hard memory limit is exceeded. The RAMPerThreadHardLimitMB can be controlled
+ via IndexWriterConfig and defaults to 1945 MB.
+ Since IndexWriter flushes DWPT concurrently not all memory is released
+ immediately. Applications should still use a ramBufferSize significantly
+ lower than the JVMs avaliable heap memory since under high load multiple
+ flushing DWPT can consume substantial transient memory when IO performance
+ is slow relative to indexing rate.
+
+ - IndexWriter#commit now doesn't block concurrent indexing while flushing all
+ 'currently' RAM resident documents to disk. Yet, flushes that occur while a
+ a full flush is running are queued and will happen after all DWPT involved
+ in the full flush are done flushing. Applications using multiple threads
+ during indexing and trigger a full flush (eg call commmit() or open a new
+ NRT reader) can use significantly more transient memory.
+
+ - IndexWriter#addDocument and IndexWriter.updateDocument can block indexing
+ threads if the number of active + number of flushing DWPT exceed a
+ safety limit. By default this happens if 2 * max number available thread
+ states (DWPTPool) is exceeded. This safety limit prevents applications from
+ exhausting their available memory if flushing can't keep up with
+ concurrently indexing threads.
+
+ - IndexWriter only applies and flushes deletes if the maxBufferedDelTerms
+ limit is reached during indexing. No segment flushes will be triggered
+ due to this setting.
+
+ - IndexWriter#flush(boolean, boolean) doesn't synchronized on IndexWriter
+ anymore. A dedicated flushLock has been introduced to prevent multiple full-
+ flushes happening concurrently.
+
+ - DocumentsWriter doesn't write shared doc stores anymore.
+
+ (Mike McCandless, Michael Busch, Simon Willnauer)
API Changes
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Sun May 1 22:38:33 2011
@@ -81,6 +81,6 @@ final class ByteSliceWriter extends Data
}
public int getAddress() {
- return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
+ return upto + (offset0 & DocumentsWriterPerThread.BYTE_BLOCK_NOT_MASK);
}
}
\ No newline at end of file
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java Sun May 1 22:38:33 2011
@@ -18,11 +18,12 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
abstract class DocConsumer {
- abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
- abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
+ abstract void processDocument(FieldInfos fieldInfos) throws IOException;
+ abstract void finishDocument() throws IOException;
+ abstract void flush(final SegmentWriteState state) throws IOException;
abstract void abort();
abstract boolean freeRAM();
+ abstract void doAfterFlush();
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Sun May 1 22:38:33 2011
@@ -18,22 +18,25 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
import java.util.Map;
abstract class DocFieldConsumer {
- /** Called when DocumentsWriter decides to create a new
+ /** Called when DocumentsWriterPerThread decides to create a new
* segment */
- abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+ abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
/** Called when an aborting exception is hit */
abstract void abort();
- /** Add a new thread */
- abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
-
- /** Called when DocumentsWriter is using too much RAM.
+ /** Called when DocumentsWriterPerThread is using too much RAM.
* The consumer should free RAM, if possible, returning
* true if any RAM was in fact freed. */
abstract boolean freeRAM();
- }
+
+ abstract void startDocument() throws IOException;
+
+ abstract DocFieldConsumerPerField addField(FieldInfo fi);
+
+ abstract void finishDocument() throws IOException;
+
+}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Sun May 1 22:38:33 2011
@@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField
/** Processes all occurrences of a single field */
abstract void processFields(Fieldable[] fields, int count) throws IOException;
abstract void abort();
+ abstract FieldInfo getFieldInfo();
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Sun May 1 22:38:33 2011
@@ -19,8 +19,13 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
-import java.util.Map;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
/**
@@ -33,26 +38,39 @@ import java.util.HashMap;
final class DocFieldProcessor extends DocConsumer {
- final DocumentsWriter docWriter;
final DocFieldConsumer consumer;
final StoredFieldsWriter fieldsWriter;
- public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
- this.docWriter = docWriter;
+ // Holds all fields seen in current doc
+ DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+ int fieldCount;
+
+ // Hash table for all fields ever seen
+ DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+ int hashMask = 1;
+ int totalFieldCount;
+
+ float docBoost;
+ int fieldGen;
+ final DocumentsWriterPerThread.DocState docState;
+
+ public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+ this.docState = docWriter.docState;
this.consumer = consumer;
fieldsWriter = new StoredFieldsWriter(docWriter);
}
@Override
- public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+ public void flush(SegmentWriteState state) throws IOException {
- Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
- for ( DocConsumerPerThread thread : threads) {
- DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
- childThreadsAndFields.put(perThread.consumer, perThread.fields());
+ Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+ Collection<DocFieldConsumerPerField> fields = fields();
+ for (DocFieldConsumerPerField f : fields) {
+ childFields.put(f.getFieldInfo(), f);
}
+
fieldsWriter.flush(state);
- consumer.flush(childThreadsAndFields, state);
+ consumer.flush(childFields, state);
// Important to save after asking consumer to flush so
// consumer can alter the FieldInfo* if necessary. EG,
@@ -64,8 +82,20 @@ final class DocFieldProcessor extends Do
@Override
public void abort() {
- fieldsWriter.abort();
- consumer.abort();
+ for(int i=0;i<fieldHash.length;i++) {
+ DocFieldProcessorPerField field = fieldHash[i];
+ while(field != null) {
+ final DocFieldProcessorPerField next = field.next;
+ field.abort();
+ field = next;
+ }
+ }
+
+ try {
+ fieldsWriter.abort();
+ } finally {
+ consumer.abort();
+ }
}
@Override
@@ -73,8 +103,213 @@ final class DocFieldProcessor extends Do
return consumer.freeRAM();
}
+ public Collection<DocFieldConsumerPerField> fields() {
+ Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+ for(int i=0;i<fieldHash.length;i++) {
+ DocFieldProcessorPerField field = fieldHash[i];
+ while(field != null) {
+ fields.add(field.consumer);
+ field = field.next;
+ }
+ }
+ assert fields.size() == totalFieldCount;
+ return fields;
+ }
+
+ /** In flush we reset the fieldHash to not maintain per-field state
+ * across segments */
@Override
- public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
- return new DocFieldProcessorPerThread(threadState, this);
+ void doAfterFlush() {
+ fieldHash = new DocFieldProcessorPerField[2];
+ hashMask = 1;
+ totalFieldCount = 0;
+ }
+
+ private void rehash() {
+ final int newHashSize = (fieldHash.length*2);
+ assert newHashSize > fieldHash.length;
+
+ final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+ // Rehash
+ int newHashMask = newHashSize-1;
+ for(int j=0;j<fieldHash.length;j++) {
+ DocFieldProcessorPerField fp0 = fieldHash[j];
+ while(fp0 != null) {
+ final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+ DocFieldProcessorPerField nextFP0 = fp0.next;
+ fp0.next = newHashArray[hashPos2];
+ newHashArray[hashPos2] = fp0;
+ fp0 = nextFP0;
+ }
+ }
+
+ fieldHash = newHashArray;
+ hashMask = newHashMask;
+ }
+
+ @Override
+ public void processDocument(FieldInfos fieldInfos) throws IOException {
+
+ consumer.startDocument();
+ fieldsWriter.startDocument();
+
+ final Document doc = docState.doc;
+
+ fieldCount = 0;
+
+ final int thisFieldGen = fieldGen++;
+
+ final List<Fieldable> docFields = doc.getFields();
+ final int numDocFields = docFields.size();
+
+ // Absorb any new fields first seen in this document.
+ // Also absorb any changes to fields we had already
+ // seen before (eg suddenly turning on norms or
+ // vectors, etc.):
+
+ for(int i=0;i<numDocFields;i++) {
+ Fieldable field = docFields.get(i);
+ final String fieldName = field.name();
+
+ // Make sure we have a PerField allocated
+ final int hashPos = fieldName.hashCode() & hashMask;
+ DocFieldProcessorPerField fp = fieldHash[hashPos];
+ while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+ fp = fp.next;
+ }
+
+ if (fp == null) {
+
+ // TODO FI: we need to genericize the "flags" that a
+ // field holds, and, how these flags are merged; it
+ // needs to be more "pluggable" such that if I want
+ // to have a new "thing" my Fields can do, I can
+ // easily add it
+ FieldInfo fi = fieldInfos.addOrUpdate(fieldName, field.isIndexed(), field.isTermVectorStored(),
+ field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+ field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+ fp = new DocFieldProcessorPerField(this, fi);
+ fp.next = fieldHash[hashPos];
+ fieldHash[hashPos] = fp;
+ totalFieldCount++;
+
+ if (totalFieldCount >= fieldHash.length/2)
+ rehash();
+ } else {
+ fieldInfos.addOrUpdate(fp.fieldInfo.name, field.isIndexed(), field.isTermVectorStored(),
+ field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+ field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+ }
+
+ if (thisFieldGen != fp.lastGen) {
+
+ // First time we're seeing this field for this doc
+ fp.fieldCount = 0;
+
+ if (fieldCount == fields.length) {
+ final int newSize = fields.length*2;
+ DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+ System.arraycopy(fields, 0, newArray, 0, fieldCount);
+ fields = newArray;
+ }
+
+ fields[fieldCount++] = fp;
+ fp.lastGen = thisFieldGen;
+ }
+
+ fp.addField(field);
+
+ if (field.isStored()) {
+ fieldsWriter.addField(field, fp.fieldInfo);
+ }
+ }
+
+ // If we are writing vectors then we must visit
+ // fields in sorted order so they are written in
+ // sorted order. TODO: we actually only need to
+ // sort the subset of fields that have vectors
+ // enabled; we could save [small amount of] CPU
+ // here.
+ quickSort(fields, 0, fieldCount-1);
+
+ for(int i=0;i<fieldCount;i++)
+ fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
+
+ if (docState.maxTermPrefix != null && docState.infoStream != null) {
+ docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped. Please correct the analyzer to not produce such terms. The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
+ docState.maxTermPrefix = null;
+ }
+ }
+
+ @Override
+ void finishDocument() throws IOException {
+ try {
+ fieldsWriter.finishDocument();
+ } finally {
+ consumer.finishDocument();
+ }
+ }
+
+ void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
+ if (lo >= hi)
+ return;
+ else if (hi == 1+lo) {
+ if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+ final DocFieldProcessorPerField tmp = array[lo];
+ array[lo] = array[hi];
+ array[hi] = tmp;
+ }
+ return;
+ }
+
+ int mid = (lo + hi) >>> 1;
+
+ if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp = array[lo];
+ array[lo] = array[mid];
+ array[mid] = tmp;
+ }
+
+ if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp = array[mid];
+ array[mid] = array[hi];
+ array[hi] = tmp;
+
+ if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+ DocFieldProcessorPerField tmp2 = array[lo];
+ array[lo] = array[mid];
+ array[mid] = tmp2;
+ }
+ }
+
+ int left = lo + 1;
+ int right = hi - 1;
+
+ if (left >= right)
+ return;
+
+ DocFieldProcessorPerField partition = array[mid];
+
+ for (; ;) {
+ while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name) > 0)
+ --right;
+
+ while (left < right && array[left].fieldInfo.name.compareTo(partition.fieldInfo.name) <= 0)
+ ++left;
+
+ if (left < right) {
+ DocFieldProcessorPerField tmp = array[left];
+ array[left] = array[right];
+ array[right] = tmp;
+ --right;
+ } else {
+ break;
+ }
+ }
+
+ quickSort(array, lo, left);
+ quickSort(array, left + 1, hi);
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Sun May 1 22:38:33 2011
@@ -18,6 +18,8 @@ package org.apache.lucene.index;
*/
import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
/**
* Holds all per thread, per field state.
@@ -34,11 +36,22 @@ final class DocFieldProcessorPerField {
int fieldCount;
Fieldable[] fields = new Fieldable[1];
- public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
- this.consumer = perThread.consumer.addField(fieldInfo);
+ public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+ this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
this.fieldInfo = fieldInfo;
}
+ public void addField(Fieldable field) {
+ if (fieldCount == fields.length) {
+ int newSize = ArrayUtil.oversize(fieldCount + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+ Fieldable[] newArray = new Fieldable[newSize];
+ System.arraycopy(fields, 0, newArray, 0, fieldCount);
+ fields = newArray;
+ }
+
+ fields[fieldCount++] = field;
+ }
+
public void abort() {
consumer.abort();
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java Sun May 1 22:38:33 2011
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
-
import java.util.Map;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
/** This is a DocFieldConsumer that inverts each field,
* separately, from a Document, and accepts a
@@ -34,42 +35,72 @@ final class DocInverter extends DocField
final InvertedDocConsumer consumer;
final InvertedDocEndConsumer endConsumer;
- public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+ final DocumentsWriterPerThread.DocState docState;
+
+ final FieldInvertState fieldState = new FieldInvertState();
+
+ final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+
+ static class SingleTokenAttributeSource extends AttributeSource {
+ final CharTermAttribute termAttribute;
+ final OffsetAttribute offsetAttribute;
+
+ private SingleTokenAttributeSource() {
+ termAttribute = addAttribute(CharTermAttribute.class);
+ offsetAttribute = addAttribute(OffsetAttribute.class);
+ }
+
+ public void reinit(String stringValue, int startOffset, int endOffset) {
+ termAttribute.setEmpty().append(stringValue);
+ offsetAttribute.setOffset(startOffset, endOffset);
+ }
+ }
+
+ // Used to read a string value for a field
+ final ReusableStringReader stringReader = new ReusableStringReader();
+
+ public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+ this.docState = docState;
this.consumer = consumer;
this.endConsumer = endConsumer;
}
@Override
- void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
- Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
- Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
+ void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
- for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+ Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+ Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
+ for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+ DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+ childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+ endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
+ }
- DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
+ consumer.flush(childFieldsToFlush, state);
+ endConsumer.flush(endChildFieldsToFlush, state);
+ }
- Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
- Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
- for (final DocFieldConsumerPerField field: entry.getValue() ) {
- DocInverterPerField perField = (DocInverterPerField) field;
- childFields.add(perField.consumer);
- endChildFields.add(perField.endConsumer);
- }
+ @Override
+ public void startDocument() throws IOException {
+ consumer.startDocument();
+ endConsumer.startDocument();
+ }
- childThreadsAndFields.put(perThread.consumer, childFields);
- endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
- }
-
- consumer.flush(childThreadsAndFields, state);
- endConsumer.flush(endChildThreadsAndFields, state);
+ public void finishDocument() throws IOException {
+ // TODO: allow endConsumer.finishDocument to also return
+ // a DocWriter
+ endConsumer.finishDocument();
+ consumer.finishDocument();
}
@Override
void abort() {
- consumer.abort();
- endConsumer.abort();
+ try {
+ consumer.abort();
+ } finally {
+ endConsumer.abort();
+ }
}
@Override
@@ -78,7 +109,8 @@ final class DocInverter extends DocField
}
@Override
- public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
- return new DocInverterPerThread(docFieldProcessorPerThread, this);
+ public DocFieldConsumerPerField addField(FieldInfo fi) {
+ return new DocInverterPerField(this, fi);
}
+
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Sun May 1 22:38:33 2011
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
final class DocInverterPerField extends DocFieldConsumerPerField {
- final private DocInverterPerThread perThread;
- final private FieldInfo fieldInfo;
+ final private DocInverter parent;
+ final FieldInfo fieldInfo;
final InvertedDocConsumerPerField consumer;
final InvertedDocEndConsumerPerField endConsumer;
- final DocumentsWriter.DocState docState;
+ final DocumentsWriterPerThread.DocState docState;
final FieldInvertState fieldState;
- public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
- this.perThread = perThread;
+ public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+ this.parent = parent;
this.fieldInfo = fieldInfo;
- docState = perThread.docState;
- fieldState = perThread.fieldState;
- this.consumer = perThread.consumer.addField(this, fieldInfo);
- this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+ docState = parent.docState;
+ fieldState = parent.fieldState;
+ this.consumer = parent.consumer.addField(this, fieldInfo);
+ this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
}
@Override
@@ -80,8 +80,8 @@ final class DocInverterPerField extends
if (!field.isTokenized()) { // un-tokenized field
String stringValue = field.stringValue();
final int valueLength = stringValue.length();
- perThread.singleToken.reinit(stringValue, 0, valueLength);
- fieldState.attributeSource = perThread.singleToken;
+ parent.singleToken.reinit(stringValue, 0, valueLength);
+ fieldState.attributeSource = parent.singleToken;
consumer.start(field);
boolean success = false;
@@ -89,8 +89,9 @@ final class DocInverterPerField extends
consumer.add();
success = true;
} finally {
- if (!success)
+ if (!success) {
docState.docWriter.setAborting();
+ }
}
fieldState.offset += valueLength;
fieldState.length++;
@@ -114,8 +115,8 @@ final class DocInverterPerField extends
if (stringValue == null) {
throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
}
- perThread.stringReader.init(stringValue);
- reader = perThread.stringReader;
+ parent.stringReader.init(stringValue);
+ reader = parent.stringReader;
}
// Tokenize field and add to postingTable
@@ -166,8 +167,9 @@ final class DocInverterPerField extends
consumer.add();
success = true;
} finally {
- if (!success)
+ if (!success) {
docState.docWriter.setAborting();
+ }
}
fieldState.length++;
fieldState.position++;
@@ -195,4 +197,9 @@ final class DocInverterPerField extends
consumer.finish();
endConsumer.finish();
}
+
+ @Override
+ FieldInfo getFieldInfo() {
+ return fieldInfo;
+ }
}