You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC

svn commit: r1098427 [1/5] - in /lucene/dev/trunk: ./ lucene/ lucene/src/java/org/apache/lucene/index/ lucene/src/test-framework/org/apache/lucene/search/ lucene/src/test-framework/org/apache/lucene/store/ lucene/src/test-framework/org/apache/lucene/ut...

Author: uschindler
Date: Sun May  1 22:38:33 2011
New Revision: 1098427

URL: http://svn.apache.org/viewvc?rev=1098427&view=rev
Log:
LUCENE-3023: Land DWPT on trunk

Added:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java
      - copied unchanged from r1097796, lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestPerSegmentDeletes.java
Removed:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
Modified:
    lucene/dev/trunk/   (props changed)
    lucene/dev/trunk/lucene/   (props changed)
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermVectorsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHash.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
    lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/search/QueryUtils.java
    lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/trunk/lucene/src/test-framework/org/apache/lucene/util/LuceneTestCase.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestLazyBug.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestRollingUpdates.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestSegmentMerger.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
    lucene/dev/trunk/solr/   (props changed)

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Sun May  1 22:38:33 2011
@@ -173,6 +173,70 @@ Changes in Runtime Behavior
   globally across IndexWriter sessions and persisted into a X.fnx file on
   successful commit. The corresponding file format changes are backwards-
   compatible. (Michael Busch, Simon Willnauer)
+  
+* LUCENE-2956, LUCENE-2573, LUCENE-2324, LUCENE-2555: Changes from 
+  DocumentsWriterPerThread:
+
+  - IndexWriter now uses a DocumentsWriter per thread when indexing documents.
+    Each DocumentsWriterPerThread indexes documents in its own private segment,
+    and the in memory segments are no longer merged on flush.  Instead, each
+    segment is separately flushed to disk and subsequently merged with normal
+    segment merging.
+
+  - DocumentsWriterPerThread (DWPT) is now flushed concurrently based on a
+    FlushPolicy.  When a DWPT is flushed, a fresh DWPT is swapped in so that
+    indexing may continue concurrently with flushing.  The selected
+    DWPT flushes all its RAM resident documents do disk.  Note: Segment flushes
+    don't flush all RAM resident documents but only the documents private to
+    the DWPT selected for flushing. 
+  
+  - Flushing is now controlled by FlushPolicy that is called for every add,
+    update or delete on IndexWriter. By default DWPTs are flushed either on
+    maxBufferedDocs per DWPT or the global active used memory. Once the active
+    memory exceeds ramBufferSizeMB only the largest DWPT is selected for
+    flushing and the memory used by this DWPT is substracted from the active
+    memory and added to a flushing memory pool, which can lead to temporarily
+    higher memory usage due to ongoing indexing.
+    
+  - IndexWriter now can utilize ramBufferSize > 2048 MB. Each DWPT can address
+    up to 2048 MB memory such that the ramBufferSize is now bounded by the max
+    number of DWPT avaliable in the used DocumentsWriterPerThreadPool.
+    IndexWriters net memory consumption can grow far beyond the 2048 MB limit if
+    the applicatoin can use all available DWPTs. To prevent a DWPT from
+    exhausting its address space IndexWriter will forcefully flush a DWPT if its
+    hard memory limit is exceeded. The RAMPerThreadHardLimitMB can be controlled
+    via IndexWriterConfig and defaults to 1945 MB. 
+    Since IndexWriter flushes DWPT concurrently not all memory is released
+    immediately. Applications should still use a ramBufferSize significantly
+    lower than the JVMs avaliable heap memory since under high load multiple
+    flushing DWPT can consume substantial transient memory when IO performance
+    is slow relative to indexing rate.
+    
+  - IndexWriter#commit now doesn't block concurrent indexing while flushing all
+    'currently' RAM resident documents to disk. Yet, flushes that occur while a
+    a full flush is running are queued and will happen after all DWPT involved
+    in the full flush are done flushing. Applications using multiple threads
+    during indexing and trigger a full flush (eg call commmit() or open a new
+    NRT reader) can use significantly more transient memory.
+    
+  - IndexWriter#addDocument and IndexWriter.updateDocument can block indexing
+    threads if the number of active + number of flushing DWPT exceed a
+    safety limit. By default this happens if 2 * max number available thread
+    states (DWPTPool) is exceeded. This safety limit prevents applications from
+    exhausting their available memory if flushing can't keep up with
+    concurrently indexing threads.  
+    
+  - IndexWriter only applies and flushes deletes if the maxBufferedDelTerms
+    limit is reached during indexing. No segment flushes will be triggered
+    due to this setting.
+    
+  - IndexWriter#flush(boolean, boolean) doesn't synchronized on IndexWriter
+    anymore. A dedicated flushLock has been introduced to prevent multiple full-
+    flushes happening concurrently. 
+    
+  - DocumentsWriter doesn't write shared doc stores anymore. 
+  
+  (Mike McCandless, Michael Busch, Simon Willnauer)
 
 API Changes
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Sun May  1 22:38:33 2011
@@ -81,6 +81,6 @@ final class ByteSliceWriter extends Data
   }
 
   public int getAddress() {
-    return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
+    return upto + (offset0 & DocumentsWriterPerThread.BYTE_BLOCK_NOT_MASK);
   }
 }
\ No newline at end of file

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocConsumer.java Sun May  1 22:38:33 2011
@@ -18,11 +18,12 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 
 abstract class DocConsumer {
-  abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
-  abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
+  abstract void processDocument(FieldInfos fieldInfos) throws IOException;
+  abstract void finishDocument() throws IOException;
+  abstract void flush(final SegmentWriteState state) throws IOException;
   abstract void abort();
   abstract boolean freeRAM();
+  abstract void doAfterFlush();
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Sun May  1 22:38:33 2011
@@ -18,22 +18,25 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 
 abstract class DocFieldConsumer {
-  /** Called when DocumentsWriter decides to create a new
+  /** Called when DocumentsWriterPerThread decides to create a new
    *  segment */
-  abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+  abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
 
   /** Called when an aborting exception is hit */
   abstract void abort();
 
-  /** Add a new thread */
-  abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
-
-  /** Called when DocumentsWriter is using too much RAM.
+  /** Called when DocumentsWriterPerThread is using too much RAM.
    *  The consumer should free RAM, if possible, returning
    *  true if any RAM was in fact freed. */
   abstract boolean freeRAM();
-  }
+
+  abstract void startDocument() throws IOException;
+
+  abstract DocFieldConsumerPerField addField(FieldInfo fi);
+
+  abstract void finishDocument() throws IOException;
+
+}

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Sun May  1 22:38:33 2011
@@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField 
   /** Processes all occurrences of a single field */
   abstract void processFields(Fieldable[] fields, int count) throws IOException;
   abstract void abort();
+  abstract FieldInfo getFieldInfo();
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Sun May  1 22:38:33 2011
@@ -19,8 +19,13 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
 
 
 /**
@@ -33,26 +38,39 @@ import java.util.HashMap;
 
 final class DocFieldProcessor extends DocConsumer {
 
-  final DocumentsWriter docWriter;
   final DocFieldConsumer consumer;
   final StoredFieldsWriter fieldsWriter;
 
-  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
-    this.docWriter = docWriter;
+  // Holds all fields seen in current doc
+  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+  int fieldCount;
+
+  // Hash table for all fields ever seen
+  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+  int hashMask = 1;
+  int totalFieldCount;
+
+  float docBoost;
+  int fieldGen;
+  final DocumentsWriterPerThread.DocState docState;
+
+  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+    this.docState = docWriter.docState;
     this.consumer = consumer;
     fieldsWriter = new StoredFieldsWriter(docWriter);
   }
 
   @Override
-  public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+  public void flush(SegmentWriteState state) throws IOException {
 
-    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
-    for ( DocConsumerPerThread thread : threads) {
-      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
-      childThreadsAndFields.put(perThread.consumer, perThread.fields());
+    Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Collection<DocFieldConsumerPerField> fields = fields();
+    for (DocFieldConsumerPerField f : fields) {
+      childFields.put(f.getFieldInfo(), f);
     }
+
     fieldsWriter.flush(state);
-    consumer.flush(childThreadsAndFields, state);
+    consumer.flush(childFields, state);
 
     // Important to save after asking consumer to flush so
     // consumer can alter the FieldInfo* if necessary.  EG,
@@ -64,8 +82,20 @@ final class DocFieldProcessor extends Do
 
   @Override
   public void abort() {
-    fieldsWriter.abort();
-    consumer.abort();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        final DocFieldProcessorPerField next = field.next;
+        field.abort();
+        field = next;
+      }
+    }
+
+    try {
+      fieldsWriter.abort();
+    } finally {
+      consumer.abort();
+    }
   }
 
   @Override
@@ -73,8 +103,213 @@ final class DocFieldProcessor extends Do
     return consumer.freeRAM();
   }
 
+  public Collection<DocFieldConsumerPerField> fields() {
+    Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        fields.add(field.consumer);
+        field = field.next;
+      }
+    }
+    assert fields.size() == totalFieldCount;
+    return fields;
+  }
+
+  /** In flush we reset the fieldHash to not maintain per-field state
+   *  across segments */
   @Override
-  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
-    return new DocFieldProcessorPerThread(threadState, this);
+  void doAfterFlush() {
+    fieldHash = new DocFieldProcessorPerField[2];
+    hashMask = 1;
+    totalFieldCount = 0;
+  }
+
+  private void rehash() {
+    final int newHashSize = (fieldHash.length*2);
+    assert newHashSize > fieldHash.length;
+
+    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+    // Rehash
+    int newHashMask = newHashSize-1;
+    for(int j=0;j<fieldHash.length;j++) {
+      DocFieldProcessorPerField fp0 = fieldHash[j];
+      while(fp0 != null) {
+        final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+        DocFieldProcessorPerField nextFP0 = fp0.next;
+        fp0.next = newHashArray[hashPos2];
+        newHashArray[hashPos2] = fp0;
+        fp0 = nextFP0;
+      }
+    }
+
+    fieldHash = newHashArray;
+    hashMask = newHashMask;
+  }
+
+  @Override
+  public void processDocument(FieldInfos fieldInfos) throws IOException {
+
+    consumer.startDocument();
+    fieldsWriter.startDocument();
+
+    final Document doc = docState.doc;
+
+    fieldCount = 0;
+
+    final int thisFieldGen = fieldGen++;
+
+    final List<Fieldable> docFields = doc.getFields();
+    final int numDocFields = docFields.size();
+
+    // Absorb any new fields first seen in this document.
+    // Also absorb any changes to fields we had already
+    // seen before (eg suddenly turning on norms or
+    // vectors, etc.):
+
+    for(int i=0;i<numDocFields;i++) {
+      Fieldable field = docFields.get(i);
+      final String fieldName = field.name();
+
+      // Make sure we have a PerField allocated
+      final int hashPos = fieldName.hashCode() & hashMask;
+      DocFieldProcessorPerField fp = fieldHash[hashPos];
+      while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+        fp = fp.next;
+      }
+
+      if (fp == null) {
+
+        // TODO FI: we need to genericize the "flags" that a
+        // field holds, and, how these flags are merged; it
+        // needs to be more "pluggable" such that if I want
+        // to have a new "thing" my Fields can do, I can
+        // easily add it
+        FieldInfo fi = fieldInfos.addOrUpdate(fieldName, field.isIndexed(), field.isTermVectorStored(),
+                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+        fp = new DocFieldProcessorPerField(this, fi);
+        fp.next = fieldHash[hashPos];
+        fieldHash[hashPos] = fp;
+        totalFieldCount++;
+
+        if (totalFieldCount >= fieldHash.length/2)
+          rehash();
+      } else {
+        fieldInfos.addOrUpdate(fp.fieldInfo.name, field.isIndexed(), field.isTermVectorStored(),
+                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+      }
+
+      if (thisFieldGen != fp.lastGen) {
+
+        // First time we're seeing this field for this doc
+        fp.fieldCount = 0;
+
+        if (fieldCount == fields.length) {
+          final int newSize = fields.length*2;
+          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+          System.arraycopy(fields, 0, newArray, 0, fieldCount);
+          fields = newArray;
+        }
+
+        fields[fieldCount++] = fp;
+        fp.lastGen = thisFieldGen;
+      }
+
+      fp.addField(field);
+
+      if (field.isStored()) {
+        fieldsWriter.addField(field, fp.fieldInfo);
+      }
+    }
+
+    // If we are writing vectors then we must visit
+    // fields in sorted order so they are written in
+    // sorted order.  TODO: we actually only need to
+    // sort the subset of fields that have vectors
+    // enabled; we could save [small amount of] CPU
+    // here.
+    quickSort(fields, 0, fieldCount-1);
+
+    for(int i=0;i<fieldCount;i++)
+      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
+
+    if (docState.maxTermPrefix != null && docState.infoStream != null) {
+      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
+      docState.maxTermPrefix = null;
+    }
+  }
+
+  @Override
+  void finishDocument() throws IOException {
+    try {
+      fieldsWriter.finishDocument();
+    } finally {
+      consumer.finishDocument();
+    }
+  }
+
+  void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
+    if (lo >= hi)
+      return;
+    else if (hi == 1+lo) {
+      if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+        final DocFieldProcessorPerField tmp = array[lo];
+        array[lo] = array[hi];
+        array[hi] = tmp;
+      }
+      return;
+    }
+
+    int mid = (lo + hi) >>> 1;
+
+    if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[lo];
+      array[lo] = array[mid];
+      array[mid] = tmp;
+    }
+
+    if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[mid];
+      array[mid] = array[hi];
+      array[hi] = tmp;
+
+      if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+        DocFieldProcessorPerField tmp2 = array[lo];
+        array[lo] = array[mid];
+        array[mid] = tmp2;
+      }
+    }
+
+    int left = lo + 1;
+    int right = hi - 1;
+
+    if (left >= right)
+      return;
+
+    DocFieldProcessorPerField partition = array[mid];
+
+    for (; ;) {
+      while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name) > 0)
+        --right;
+
+      while (left < right && array[left].fieldInfo.name.compareTo(partition.fieldInfo.name) <= 0)
+        ++left;
+
+      if (left < right) {
+        DocFieldProcessorPerField tmp = array[left];
+        array[left] = array[right];
+        array[right] = tmp;
+        --right;
+      } else {
+        break;
+      }
+    }
+
+    quickSort(array, lo, left);
+    quickSort(array, left + 1, hi);
   }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Sun May  1 22:38:33 2011
@@ -18,6 +18,8 @@ package org.apache.lucene.index;
  */
 
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
 
 /**
  * Holds all per thread, per field state.
@@ -34,11 +36,22 @@ final class DocFieldProcessorPerField {
   int fieldCount;
   Fieldable[] fields = new Fieldable[1];
 
-  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
-    this.consumer = perThread.consumer.addField(fieldInfo);
+  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
     this.fieldInfo = fieldInfo;
   }
 
+  public void addField(Fieldable field) {
+    if (fieldCount == fields.length) {
+      int newSize = ArrayUtil.oversize(fieldCount + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+      Fieldable[] newArray = new Fieldable[newSize];
+      System.arraycopy(fields, 0, newArray, 0, fieldCount);
+      fields = newArray;
+    }
+
+    fields[fieldCount++] = field;
+  }
+
   public void abort() {
     consumer.abort();
   }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverter.java Sun May  1 22:38:33 2011
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-
 import java.util.Map;
 
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
 
 /** This is a DocFieldConsumer that inverts each field,
  *  separately, from a Document, and accepts a
@@ -34,42 +35,72 @@ final class DocInverter extends DocField
   final InvertedDocConsumer consumer;
   final InvertedDocEndConsumer endConsumer;
 
-  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+  final DocumentsWriterPerThread.DocState docState;
+
+  final FieldInvertState fieldState = new FieldInvertState();
+
+  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+
+  static class SingleTokenAttributeSource extends AttributeSource {
+    final CharTermAttribute termAttribute;
+    final OffsetAttribute offsetAttribute;
+
+    private SingleTokenAttributeSource() {
+      termAttribute = addAttribute(CharTermAttribute.class);
+      offsetAttribute = addAttribute(OffsetAttribute.class);
+    }
+
+    public void reinit(String stringValue, int startOffset,  int endOffset) {
+      termAttribute.setEmpty().append(stringValue);
+      offsetAttribute.setOffset(startOffset, endOffset);
+    }
+  }
+
+  // Used to read a string value for a field
+  final ReusableStringReader stringReader = new ReusableStringReader();
+
+  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+    this.docState = docState;
     this.consumer = consumer;
     this.endConsumer = endConsumer;
   }
 
   @Override
-  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
-    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
+  void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+    Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+    Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
 
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
+    }
 
-      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
+    consumer.flush(childFieldsToFlush, state);
+    endConsumer.flush(endChildFieldsToFlush, state);
+  }
 
-      Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
-      Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
-      for (final DocFieldConsumerPerField field: entry.getValue() ) {  
-        DocInverterPerField perField = (DocInverterPerField) field;
-        childFields.add(perField.consumer);
-        endChildFields.add(perField.endConsumer);
-      }
+  @Override
+  public void startDocument() throws IOException {
+    consumer.startDocument();
+    endConsumer.startDocument();
+  }
 
-      childThreadsAndFields.put(perThread.consumer, childFields);
-      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
-    }
-    
-    consumer.flush(childThreadsAndFields, state);
-    endConsumer.flush(endChildThreadsAndFields, state);
+  public void finishDocument() throws IOException {
+    // TODO: allow endConsumer.finishDocument to also return
+    // a DocWriter
+    endConsumer.finishDocument();
+    consumer.finishDocument();
   }
 
   @Override
   void abort() {
-    consumer.abort();
-    endConsumer.abort();
+    try {
+      consumer.abort();
+    } finally {
+      endConsumer.abort();
+    }
   }
 
   @Override
@@ -78,7 +109,8 @@ final class DocInverter extends DocField
   }
 
   @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
-    return new DocInverterPerThread(docFieldProcessorPerThread, this);
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocInverterPerField(this, fi);
   }
+
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Sun May  1 22:38:33 2011
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
 
 final class DocInverterPerField extends DocFieldConsumerPerField {
 
-  final private DocInverterPerThread perThread;
-  final private FieldInfo fieldInfo;
+  final private DocInverter parent;
+  final FieldInfo fieldInfo;
   final InvertedDocConsumerPerField consumer;
   final InvertedDocEndConsumerPerField endConsumer;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
 
-  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
-    this.perThread = perThread;
+  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
-    docState = perThread.docState;
-    fieldState = perThread.fieldState;
-    this.consumer = perThread.consumer.addField(this, fieldInfo);
-    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+    docState = parent.docState;
+    fieldState = parent.fieldState;
+    this.consumer = parent.consumer.addField(this, fieldInfo);
+    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
   }
 
   @Override
@@ -80,8 +80,8 @@ final class DocInverterPerField extends 
         if (!field.isTokenized()) {		  // un-tokenized field
           String stringValue = field.stringValue();
           final int valueLength = stringValue.length();
-          perThread.singleToken.reinit(stringValue, 0, valueLength);
-          fieldState.attributeSource = perThread.singleToken;
+          parent.singleToken.reinit(stringValue, 0, valueLength);
+          fieldState.attributeSource = parent.singleToken;
           consumer.start(field);
 
           boolean success = false;
@@ -89,8 +89,9 @@ final class DocInverterPerField extends 
             consumer.add();
             success = true;
           } finally {
-            if (!success)
+            if (!success) {
               docState.docWriter.setAborting();
+            }
           }
           fieldState.offset += valueLength;
           fieldState.length++;
@@ -114,8 +115,8 @@ final class DocInverterPerField extends 
               if (stringValue == null) {
                 throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
               }
-              perThread.stringReader.init(stringValue);
-              reader = perThread.stringReader;
+              parent.stringReader.init(stringValue);
+              reader = parent.stringReader;
             }
           
             // Tokenize field and add to postingTable
@@ -166,8 +167,9 @@ final class DocInverterPerField extends 
                 consumer.add();
                 success = true;
               } finally {
-                if (!success)
+                if (!success) {
                   docState.docWriter.setAborting();
+                }
               }
               fieldState.length++;
               fieldState.position++;
@@ -195,4 +197,9 @@ final class DocInverterPerField extends 
     consumer.finish();
     endConsumer.finish();
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return fieldInfo;
+  }
 }