You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2010/07/21 12:27:22 UTC

svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/ branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/ trunk/lucen...

Author: buschmi
Date: Wed Jul 21 10:27:20 2010
New Revision: 966168

URL: http://svn.apache.org/viewvc?rev=966168&view=rev
Log:
LUCENE-2324: Committing second version of the patch to the real-time branch.  It's not done yet, but easier to track progress using the branch.

Added:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
Removed:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,70 @@
+package org.apache.lucene.index;
+
+import java.util.TreeMap;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
+
+public class BufferedDeletesInRAM {
+  static class Delete {
+    int flushCount;
+
+    public Delete(int flushCount) {
+      this.flushCount = flushCount;
+    }
+  }
+
+  final static class DeleteTerm extends Delete {
+    final Term term;
+
+    public DeleteTerm(Term term, int flushCount) {
+      super(flushCount);
+      this.term = term;
+    }
+  }
+
+  final static class DeleteTerms extends Delete {
+    final Term[] terms;
+
+    public DeleteTerms(Term[] terms, int flushCount) {
+      super(flushCount);
+      this.terms = terms;
+    }
+  }
+  
+  final static class DeleteQuery extends Delete {
+    final Query query;
+
+    public DeleteQuery(Query query, int flushCount) {
+      super(flushCount);
+      this.query = query;
+    }
+  }
+
+  final ThreadSafeCloneableSortedMap<Long, Delete> deletes = ThreadSafeCloneableSortedMap
+      .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
+
+  final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
+  }
+
+  final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
+  }
+
+  final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
+    deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
+  }
+
+  boolean hasDeletes() {
+    return !deletes.isEmpty();
+  }
+
+  void clear() {
+    deletes.clear();
+  }
+
+  int getNumDeletes() {
+    return this.deletes.size();
+  }
+}

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
@@ -50,10 +50,10 @@ final class ByteBlockPool {
   public byte[][] buffers = new byte[10][];
 
   int bufferUpto = -1;                        // Which buffer we are upto
-  public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE;             // Where we are in head buffer
+  public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;             // Where we are in head buffer
 
   public byte[] buffer;                              // Current head buffer
-  public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE;          // Current head offset
+  public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;          // Current head offset
 
   private final Allocator allocator;
 
@@ -95,11 +95,11 @@ final class ByteBlockPool {
     bufferUpto++;
 
     byteUpto = 0;
-    byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
+    byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
   }
 
   public int newSlice(final int size) {
-    if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-size)
+    if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
       nextBuffer();
     final int upto = byteUpto;
     byteUpto += size;
@@ -123,7 +123,7 @@ final class ByteBlockPool {
     final int newSize = levelSizeArray[newLevel];
 
     // Maybe allocate another block
-    if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
+    if (byteUpto > DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
       nextBuffer();
 
     final int newUpto = byteUpto;
@@ -151,8 +151,8 @@ final class ByteBlockPool {
   // Fill in a BytesRef from term's length & bytes encoded in
   // byte block
   final BytesRef setBytesRef(BytesRef term, int textStart) {
-    final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriter.BYTE_BLOCK_SHIFT];
-    int pos = textStart & DocumentsWriter.BYTE_BLOCK_MASK;
+    final byte[] bytes = term.bytes = buffers[textStart >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
+    int pos = textStart & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     if ((bytes[pos] & 0x80) == 0) {
       // length is 1 byte
       term.length = bytes[pos];

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
@@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
     this.endIndex = endIndex;
 
     level = 0;
-    bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
-    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+    bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
     buffer = pool.buffers[bufferUpto];
-    upto = startIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = startIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
 
     final int firstSize = ByteBlockPool.levelSizeArray[0];
 
     if (startIndex+firstSize >= endIndex) {
       // There is only this one slice to read
-      limit = endIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+      limit = endIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     } else
       limit = upto+firstSize-4;
   }
@@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
     level = ByteBlockPool.nextLevelArray[level];
     final int newSize = ByteBlockPool.levelSizeArray[level];
 
-    bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
-    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
+    bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
+    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
 
     buffer = pool.buffers[bufferUpto];
-    upto = nextIndex & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = nextIndex & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
 
     if (nextIndex + newSize >= endIndex) {
       // We are advancing to the final slice

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
@@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
    * Set up the writer to write at address.
    */
   public void init(int address) {
-    slice = pool.buffers[address >> DocumentsWriter.BYTE_BLOCK_SHIFT];
+    slice = pool.buffers[address >> DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
     assert slice != null;
-    upto = address & DocumentsWriter.BYTE_BLOCK_MASK;
+    upto = address & DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
     offset0 = address;
     assert upto < slice.length;
   }
@@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
   }
 
   public int getAddress() {
-    return upto + (offset0 & DocumentsWriter.BYTE_BLOCK_NOT_MASK);
+    return upto + (offset0 & DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
   }
 }
\ No newline at end of file

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,11 +18,10 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 
 abstract class DocConsumer {
-  abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
-  abstract void flush(final Collection<DocConsumerPerThread> threads, final SegmentWriteState state) throws IOException;
+  abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
+  abstract void flush(final SegmentWriteState state) throws IOException;
   abstract void closeDocStore(final SegmentWriteState state) throws IOException;
   abstract void abort();
   abstract boolean freeRAM();

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
@@ -18,7 +18,6 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 
 abstract class DocFieldConsumer {
@@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
 
   /** Called when DocumentsWriter decides to create a new
    *  segment */
-  abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+  abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
 
   /** Called when DocumentsWriter decides to close the doc
    *  stores */
@@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
   /** Called when an aborting exception is hit */
   abstract void abort();
 
-  /** Add a new thread */
-  abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
-
   /** Called when DocumentsWriter is using too much RAM.
    *  The consumer should free RAM, if possible, returning
    *  true if any RAM was in fact freed. */
   abstract boolean freeRAM();
+  
+  abstract void startDocument() throws IOException;
 
+  abstract DocFieldConsumerPerField addField(FieldInfo fi);
+  
+  abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
+  
   void setFieldInfos(FieldInfos fieldInfos) {
     this.fieldInfos = fieldInfos;
   }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
@@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField 
   /** Processes all occurrences of a single field */
   abstract void processFields(Fieldable[] fields, int count) throws IOException;
   abstract void abort();
+  abstract FieldInfo getFieldInfo();
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
@@ -17,12 +17,9 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
+import java.io.IOException;
 import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.HashSet;
-import java.io.IOException;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.RamUsageEstimator;
@@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
 final class DocFieldConsumers extends DocFieldConsumer {
   final DocFieldConsumer one;
   final DocFieldConsumer two;
+  final DocumentsWriterPerThread.DocState docState;
 
-  public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
+  public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
     this.one = one;
     this.two = two;
+    this.docState = processor.docState;
   }
 
   @Override
@@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
   }
 
   @Override
-  public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
-    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
-
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet()) {
+  public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
-      final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
+    Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Map<FieldInfo, DocFieldConsumerPerField> twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
 
-      final Collection<DocFieldConsumerPerField> fields = entry.getValue();
-
-      Iterator<DocFieldConsumerPerField> fieldsIt = fields.iterator();
-      Collection<DocFieldConsumerPerField> oneFields = new HashSet<DocFieldConsumerPerField>();
-      Collection<DocFieldConsumerPerField> twoFields = new HashSet<DocFieldConsumerPerField>();
-      while(fieldsIt.hasNext()) {
-        DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
-        oneFields.add(perField.one);
-        twoFields.add(perField.two);
-      }
-
-      oneThreadsAndFields.put(perThread.one, oneFields);
-      twoThreadsAndFields.put(perThread.two, twoFields);
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
+      oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
+      twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
     }
-    
 
-    one.flush(oneThreadsAndFields, state);
-    two.flush(twoThreadsAndFields, state);
+    one.flush(oneFieldsToFlush, state);
+    two.flush(twoFieldsToFlush, state);
   }
 
   @Override
@@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
     return any;
   }
 
-  @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
-    return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
-  }
-
   PerDoc[] docFreeList = new PerDoc[1];
   int freeCount;
   int allocCount;
 
-  synchronized PerDoc getPerDoc() {
+  PerDoc getPerDoc() {
     if (freeCount == 0) {
       allocCount++;
       if (allocCount > docFreeList.length) {
@@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
       return docFreeList[--freeCount];
   }
 
-  synchronized void freePerDoc(PerDoc perDoc) {
+  void freePerDoc(PerDoc perDoc) {
     assert freeCount < docFreeList.length;
     docFreeList[freeCount++] = perDoc;
   }
 
-  class PerDoc extends DocumentsWriter.DocWriter {
+  class PerDoc extends DocumentsWriterPerThread.DocWriter {
 
-    DocumentsWriter.DocWriter writerOne;
-    DocumentsWriter.DocWriter writerTwo;
+    DocumentsWriterPerThread.DocWriter writerOne;
+    DocumentsWriterPerThread.DocWriter writerTwo;
 
     @Override
     public long sizeInBytes() {
@@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
       }
     }
   }
+  
+  @Override
+  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+    final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
+    final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
+    if (oneDoc == null)
+      return twoDoc;
+    else if (twoDoc == null)
+      return oneDoc;
+    else {
+      DocFieldConsumers.PerDoc both = getPerDoc();
+      both.docID = docState.docID;
+      assert oneDoc.docID == docState.docID;
+      assert twoDoc.docID == docState.docID;
+      both.writerOne = oneDoc;
+      both.writerTwo = twoDoc;
+      return both;
+    }
+  }
+  
+  @Override
+  public void startDocument() throws IOException {
+    one.startDocument();
+    two.startDocument();
+  }
+  
+  @Override
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
+  }
+
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
@@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
 
   final DocFieldConsumerPerField one;
   final DocFieldConsumerPerField two;
-  final DocFieldConsumersPerThread perThread;
+  final DocFieldConsumers parent;
+  final FieldInfo fieldInfo;
 
-  public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
-    this.perThread = perThread;
+  public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
+    this.parent = parent;
     this.one = one;
     this.two = two;
+    this.fieldInfo = fi;
   }
 
   @Override
@@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
       two.abort();
     }
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return fieldInfo;
+  }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
@@ -19,8 +19,15 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
 
 
 /**
@@ -33,13 +40,27 @@ import java.util.HashMap;
 
 final class DocFieldProcessor extends DocConsumer {
 
-  final DocumentsWriter docWriter;
   final FieldInfos fieldInfos = new FieldInfos();
   final DocFieldConsumer consumer;
   final StoredFieldsWriter fieldsWriter;
 
-  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
-    this.docWriter = docWriter;
+  // Holds all fields seen in current doc
+  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
+  int fieldCount;
+
+  // Hash table for all fields ever seen
+  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
+  int hashMask = 1;
+  int totalFieldCount;
+
+  
+  float docBoost;
+  int fieldGen;
+  final DocumentsWriterPerThread.DocState docState;
+
+
+  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
+    this.docState = docWriter.docState;
     this.consumer = consumer;
     consumer.setFieldInfos(fieldInfos);
     fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
@@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
   }
 
   @Override
-  public void flush(Collection<DocConsumerPerThread> threads, SegmentWriteState state) throws IOException {
+  public void flush(SegmentWriteState state) throws IOException {
 
-    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
-    for ( DocConsumerPerThread thread : threads) {
-      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
-      childThreadsAndFields.put(perThread.consumer, perThread.fields());
-      perThread.trimFields(state);
+    Map<FieldInfo, DocFieldConsumerPerField> childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
+    Collection<DocFieldConsumerPerField> fields = fields();
+    for (DocFieldConsumerPerField f : fields) {
+      childFields.put(f.getFieldInfo(), f);
     }
+    trimFields(state);
+
     fieldsWriter.flush(state);
-    consumer.flush(childThreadsAndFields, state);
+    consumer.flush(childFields, state);
 
     // Important to save after asking consumer to flush so
     // consumer can alter the FieldInfo* if necessary.  EG,
@@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
 
   @Override
   public void abort() {
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        final DocFieldProcessorPerField next = field.next;
+        field.abort();
+        field = next;
+      }
+    }
+
     fieldsWriter.abort();
     consumer.abort();
   }
@@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
   public boolean freeRAM() {
     return consumer.freeRAM();
   }
+  
+  public Collection<DocFieldConsumerPerField> fields() {
+    Collection<DocFieldConsumerPerField> fields = new HashSet<DocFieldConsumerPerField>();
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField field = fieldHash[i];
+      while(field != null) {
+        fields.add(field.consumer);
+        field = field.next;
+      }
+    }
+    assert fields.size() == totalFieldCount;
+    return fields;
+  }
+
+  /** If there are fields we've seen but did not see again
+   *  in the last run, then free them up. */
+
+  void trimFields(SegmentWriteState state) {
+
+    for(int i=0;i<fieldHash.length;i++) {
+      DocFieldProcessorPerField perField = fieldHash[i];
+      DocFieldProcessorPerField lastPerField = null;
+
+      while (perField != null) {
+
+        if (perField.lastGen == -1) {
+
+          // This field was not seen since the previous
+          // flush, so, free up its resources now
+
+          // Unhash
+          if (lastPerField == null)
+            fieldHash[i] = perField.next;
+          else
+            lastPerField.next = perField.next;
+
+          if (state.infoStream != null) {
+            state.infoStream.println("  purge field=" + perField.fieldInfo.name);
+          }
+
+          totalFieldCount--;
+
+        } else {
+          // Reset
+          perField.lastGen = -1;
+          lastPerField = perField;
+        }
+
+        perField = perField.next;
+      }
+    }
+  }
+
+  private void rehash() {
+    final int newHashSize = (fieldHash.length*2);
+    assert newHashSize > fieldHash.length;
+
+    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
+
+    // Rehash
+    int newHashMask = newHashSize-1;
+    for(int j=0;j<fieldHash.length;j++) {
+      DocFieldProcessorPerField fp0 = fieldHash[j];
+      while(fp0 != null) {
+        final int hashPos2 = fp0.fieldInfo.name.hashCode() & newHashMask;
+        DocFieldProcessorPerField nextFP0 = fp0.next;
+        fp0.next = newHashArray[hashPos2];
+        newHashArray[hashPos2] = fp0;
+        fp0 = nextFP0;
+      }
+    }
+
+    fieldHash = newHashArray;
+    hashMask = newHashMask;
+  }
 
   @Override
-  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
-    return new DocFieldProcessorPerThread(threadState, this);
+  public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
+
+    consumer.startDocument();
+    fieldsWriter.startDocument();
+
+    final Document doc = docState.doc;
+
+    fieldCount = 0;
+    
+    final int thisFieldGen = fieldGen++;
+
+    final List<Fieldable> docFields = doc.getFields();
+    final int numDocFields = docFields.size();
+
+    // Absorb any new fields first seen in this document.
+    // Also absorb any changes to fields we had already
+    // seen before (eg suddenly turning on norms or
+    // vectors, etc.):
+
+    for(int i=0;i<numDocFields;i++) {
+      Fieldable field = docFields.get(i);
+      final String fieldName = field.name();
+
+      // Make sure we have a PerField allocated
+      final int hashPos = fieldName.hashCode() & hashMask;
+      DocFieldProcessorPerField fp = fieldHash[hashPos];
+      while(fp != null && !fp.fieldInfo.name.equals(fieldName)) {
+        fp = fp.next;
+      }
+        
+      if (fp == null) {
+
+        // TODO FI: we need to genericize the "flags" that a
+        // field holds, and, how these flags are merged; it
+        // needs to be more "pluggable" such that if I want
+        // to have a new "thing" my Fields can do, I can
+        // easily add it
+        FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
+                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+
+        fp = new DocFieldProcessorPerField(this, fi);
+        fp.next = fieldHash[hashPos];
+        fieldHash[hashPos] = fp;
+        totalFieldCount++;
+
+        if (totalFieldCount >= fieldHash.length/2)
+          rehash();
+      } else {
+        fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
+                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
+                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
+      }
+      
+      if (thisFieldGen != fp.lastGen) {
+
+        // First time we're seeing this field for this doc
+        fp.fieldCount = 0;
+
+        if (fieldCount == fields.length) {
+          final int newSize = fields.length*2;
+          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
+          System.arraycopy(fields, 0, newArray, 0, fieldCount);
+          fields = newArray;
+        }
+
+        fields[fieldCount++] = fp;
+        fp.lastGen = thisFieldGen;
+      }
+
+      if (fp.fieldCount == fp.fields.length) {
+        Fieldable[] newArray = new Fieldable[fp.fields.length*2];
+        System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
+        fp.fields = newArray;
+      }
+
+      fp.fields[fp.fieldCount++] = field;
+      if (field.isStored()) {
+        fieldsWriter.addField(field, fp.fieldInfo);
+      }
+    }
+
+    // If we are writing vectors then we must visit
+    // fields in sorted order so they are written in
+    // sorted order.  TODO: we actually only need to
+    // sort the subset of fields that have vectors
+    // enabled; we could save [small amount of] CPU
+    // here.
+    quickSort(fields, 0, fieldCount-1);
+
+    for(int i=0;i<fieldCount;i++)
+      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
+
+    if (docState.maxTermPrefix != null && docState.infoStream != null) {
+      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'"); 
+      docState.maxTermPrefix = null;
+    }
+
+    final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
+    final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
+    if (one == null) {
+      return two;
+    } else if (two == null) {
+      return one;
+    } else {
+      PerDoc both = getPerDoc();
+      both.docID = docState.docID;
+      assert one.docID == docState.docID;
+      assert two.docID == docState.docID;
+      both.one = one;
+      both.two = two;
+      return both;
+    }
+  }
+
+  void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
+    if (lo >= hi)
+      return;
+    else if (hi == 1+lo) {
+      if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+        final DocFieldProcessorPerField tmp = array[lo];
+        array[lo] = array[hi];
+        array[hi] = tmp;
+      }
+      return;
+    }
+
+    int mid = (lo + hi) >>> 1;
+
+    if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[lo];
+      array[lo] = array[mid];
+      array[mid] = tmp;
+    }
+
+    if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name) > 0) {
+      DocFieldProcessorPerField tmp = array[mid];
+      array[mid] = array[hi];
+      array[hi] = tmp;
+
+      if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name) > 0) {
+        DocFieldProcessorPerField tmp2 = array[lo];
+        array[lo] = array[mid];
+        array[mid] = tmp2;
+      }
+    }
+
+    int left = lo + 1;
+    int right = hi - 1;
+
+    if (left >= right)
+      return;
+
+    DocFieldProcessorPerField partition = array[mid];
+
+    for (; ;) {
+      while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name) > 0)
+        --right;
+
+      while (left < right && array[left].fieldInfo.name.compareTo(partition.fieldInfo.name) <= 0)
+        ++left;
+
+      if (left < right) {
+        DocFieldProcessorPerField tmp = array[left];
+        array[left] = array[right];
+        array[right] = tmp;
+        --right;
+      } else {
+        break;
+      }
+    }
+
+    quickSort(array, lo, left);
+    quickSort(array, left + 1, hi);
+  }
+
+  PerDoc[] docFreeList = new PerDoc[1];
+  int freeCount;
+  int allocCount;
+
+  PerDoc getPerDoc() {
+    if (freeCount == 0) {
+      allocCount++;
+      if (allocCount > docFreeList.length) {
+        // Grow our free list up front to make sure we have
+        // enough space to recycle all outstanding PerDoc
+        // instances
+        assert allocCount == 1+docFreeList.length;
+        docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
+      }
+      return new PerDoc();
+    } else
+      return docFreeList[--freeCount];
+  }
+
+  void freePerDoc(PerDoc perDoc) {
+    assert freeCount < docFreeList.length;
+    docFreeList[freeCount++] = perDoc;
+  }
+
+  class PerDoc extends DocumentsWriterPerThread.DocWriter {
+
+    DocumentsWriterPerThread.DocWriter one;
+    DocumentsWriterPerThread.DocWriter two;
+
+    @Override
+    public long sizeInBytes() {
+      return one.sizeInBytes() + two.sizeInBytes();
+    }
+
+    @Override
+    public void finish() throws IOException {
+      try {
+        try {
+          one.finish();
+        } finally {
+          two.finish();
+        }
+      } finally {
+        freePerDoc(this);
+      }
+    }
+
+    @Override
+    public void abort() {
+      try {
+        try {
+          one.abort();
+        } finally {
+          two.abort();
+        }
+      } finally {
+        freePerDoc(this);
+      }
+    }
   }
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
@@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
   int fieldCount;
   Fieldable[] fields = new Fieldable[1];
 
-  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
-    this.consumer = perThread.consumer.addField(fieldInfo);
+  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
+    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
     this.fieldInfo = fieldInfo;
   }
 

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
@@ -18,12 +18,13 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-
 import java.util.Map;
 
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
+import org.apache.lucene.util.AttributeSource;
+
 
 /** This is a DocFieldConsumer that inverts each field,
  *  separately, from a Document, and accepts a
@@ -34,7 +35,32 @@ final class DocInverter extends DocField
   final InvertedDocConsumer consumer;
   final InvertedDocEndConsumer endConsumer;
 
-  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+  final DocumentsWriterPerThread.DocState docState;
+
+  final FieldInvertState fieldState = new FieldInvertState();
+
+  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
+  
+  static class SingleTokenAttributeSource extends AttributeSource {
+    final CharTermAttribute termAttribute;
+    final OffsetAttribute offsetAttribute;
+    
+    private SingleTokenAttributeSource() {
+      termAttribute = addAttribute(CharTermAttribute.class);
+      offsetAttribute = addAttribute(OffsetAttribute.class);
+    }
+    
+    public void reinit(String stringValue, int startOffset,  int endOffset) {
+      termAttribute.setEmpty().append(stringValue);
+      offsetAttribute.setOffset(startOffset, endOffset);
+    }
+  }
+  
+  // Used to read a string value for a field
+  final ReusableStringReader stringReader = new ReusableStringReader();
+
+  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
+    this.docState = docState;
     this.consumer = consumer;
     this.endConsumer = endConsumer;
   }
@@ -47,33 +73,37 @@ final class DocInverter extends DocField
   }
 
   @Override
-  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
-    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
-
-    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>> entry : threadsAndFields.entrySet() ) {
+  void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
 
+    Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
+    Map<FieldInfo, InvertedDocEndConsumerPerField> endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
 
-      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
-
-      Collection<InvertedDocConsumerPerField> childFields = new HashSet<InvertedDocConsumerPerField>();
-      Collection<InvertedDocEndConsumerPerField> endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
-      for (final DocFieldConsumerPerField field: entry.getValue() ) {  
-        DocInverterPerField perField = (DocInverterPerField) field;
-        childFields.add(perField.consumer);
-        endChildFields.add(perField.endConsumer);
-      }
-
-      childThreadsAndFields.put(perThread.consumer, childFields);
-      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
+    for (Map.Entry<FieldInfo, DocFieldConsumerPerField> fieldToFlush : fieldsToFlush.entrySet()) {
+      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
+      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
+      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
     }
     
-    consumer.flush(childThreadsAndFields, state);
-    endConsumer.flush(endChildThreadsAndFields, state);
+    consumer.flush(childFieldsToFlush, state);
+    endConsumer.flush(endChildFieldsToFlush, state);
+  }
+  
+  @Override
+  public void startDocument() throws IOException {
+    consumer.startDocument();
+    endConsumer.startDocument();
   }
 
   @Override
+  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
+    // TODO: allow endConsumer.finishDocument to also return
+    // a DocWriter
+    endConsumer.finishDocument();
+    return consumer.finishDocument();
+  }
+
+
+  @Override
   public void closeDocStore(SegmentWriteState state) throws IOException {
     consumer.closeDocStore(state);
     endConsumer.closeDocStore(state);
@@ -81,17 +111,21 @@ final class DocInverter extends DocField
 
   @Override
   void abort() {
-    consumer.abort();
-    endConsumer.abort();
+    try {
+      consumer.abort();
+    } finally {
+      endConsumer.abort();
+    }
   }
 
   @Override
   public boolean freeRAM() {
     return consumer.freeRAM();
   }
-
+  
   @Override
-  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
-    return new DocInverterPerThread(docFieldProcessorPerThread, this);
+  public DocFieldConsumerPerField addField(FieldInfo fi) {
+    return new DocInverterPerField(this, fi);
   }
+
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
@@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
 
 final class DocInverterPerField extends DocFieldConsumerPerField {
 
-  final private DocInverterPerThread perThread;
-  final private FieldInfo fieldInfo;
+  final private DocInverter parent;
+  final FieldInfo fieldInfo;
   final InvertedDocConsumerPerField consumer;
   final InvertedDocEndConsumerPerField endConsumer;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
 
-  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
-    this.perThread = perThread;
+  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
-    docState = perThread.docState;
-    fieldState = perThread.fieldState;
-    this.consumer = perThread.consumer.addField(this, fieldInfo);
-    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
+    docState = parent.docState;
+    fieldState = parent.fieldState;
+    this.consumer = parent.consumer.addField(this, fieldInfo);
+    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
   }
 
   @Override
@@ -84,8 +84,8 @@ final class DocInverterPerField extends 
         if (!field.isTokenized()) {		  // un-tokenized field
           String stringValue = field.stringValue();
           final int valueLength = stringValue.length();
-          perThread.singleToken.reinit(stringValue, 0, valueLength);
-          fieldState.attributeSource = perThread.singleToken;
+          parent.singleToken.reinit(stringValue, 0, valueLength);
+          fieldState.attributeSource = parent.singleToken;
           consumer.start(field);
 
           boolean success = false;
@@ -93,8 +93,9 @@ final class DocInverterPerField extends 
             consumer.add();
             success = true;
           } finally {
-            if (!success)
+            if (!success) {
               docState.docWriter.setAborting();
+            }
           }
           fieldState.offset += valueLength;
           fieldState.length++;
@@ -119,8 +120,8 @@ final class DocInverterPerField extends 
               if (stringValue == null) {
                 throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
               }
-              perThread.stringReader.init(stringValue);
-              reader = perThread.stringReader;
+              parent.stringReader.init(stringValue);
+              reader = parent.stringReader;
             }
           
             // Tokenize field and add to postingTable
@@ -173,8 +174,9 @@ final class DocInverterPerField extends 
                 consumer.add();
                 success = true;
               } finally {
-                if (!success)
+                if (!success) {
                   docState.docWriter.setAborting();
+                }
               }
               fieldState.position++;
               if (++fieldState.length >= maxFieldLength) {
@@ -208,4 +210,9 @@ final class DocInverterPerField extends 
     consumer.finish();
     endConsumer.finish();
   }
+
+  @Override
+  FieldInfo getFieldInfo() {
+    return this.fieldInfo;
+  }
 }

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,459 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.codecs.Codec;
+import org.apache.lucene.search.Similarity;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMFile;
+import org.apache.lucene.util.ArrayUtil;
+
+public class DocumentsWriterPerThread {
+  
+  /**
+   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
+   * which returns the DocConsumer that the DocumentsWriter calls to process the
+   * documents. 
+   */
+  abstract static class IndexingChain {
+    abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
+  }
+
+  
+  static final IndexingChain defaultIndexingChain = new IndexingChain() {
+
+    @Override
+    DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
+      /*
+      This is the current indexing chain:
+
+      DocConsumer / DocConsumerPerThread
+        --> code: DocFieldProcessor / DocFieldProcessorPerThread
+          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
+            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
+              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
+                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
+                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
+                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
+                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
+                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
+                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
+              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
+    */
+
+    // Build up indexing chain:
+
+      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
+      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
+
+      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
+                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
+      final NormsWriter normsWriter = new NormsWriter();
+      final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
+      return new DocFieldProcessor(documentsWriterPerThread, docInverter);
+    }
+  };
+  
+  static class DocState {
+    final DocumentsWriterPerThread docWriter;
+    Analyzer analyzer;
+    int maxFieldLength;
+    PrintStream infoStream;
+    Similarity similarity;
+    int docID;
+    Document doc;
+    String maxTermPrefix;
+
+    DocState(DocumentsWriterPerThread docWriter) {
+      this.docWriter = docWriter;
+    }
+    
+    // Only called by asserts
+    public boolean testPoint(String name) {
+      return docWriter.writer.testPoint(name);
+    }
+  }
+  
+  /** Called if we hit an exception at a bad time (when
+   *  updating the index files) and must discard all
+   *  currently buffered docs.  This resets our state,
+   *  discarding any docs added since last flush. */
+  void abort() throws IOException {
+    try {
+      if (infoStream != null) {
+        message("docWriter: now abort");
+      }
+      try {
+        consumer.abort();
+      } catch (Throwable t) {
+      }
+
+      docStoreSegment = null;
+      numDocsInStore = 0;
+      docStoreOffset = 0;
+
+      // Reset all postings data
+      doAfterFlush();
+
+    } finally {
+      aborting = false;
+      if (infoStream != null) {
+        message("docWriter: done abort");
+      }
+    }
+  }
+
+  
+  final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
+
+  final DocumentsWriter parent;
+  final IndexWriter writer;
+  
+  final Directory directory;
+  final DocState docState;
+  final DocConsumer consumer;
+  private DocFieldProcessor docFieldProcessor;
+  
+  String segment;                         // Current segment we are working on
+  private String docStoreSegment;         // Current doc-store segment we are writing
+  private int docStoreOffset;                     // Current starting doc-store offset of current segment
+  boolean aborting;               // True if an abort is pending
+  
+  private final PrintStream infoStream;
+  private int numDocsInRAM;
+  private int numDocsInStore;
+  private int flushedDocCount;
+  SegmentWriteState flushState;
+
+  long[] sequenceIDs = new long[8];
+  
+  final List<String> closedFiles = new ArrayList<String>();
+  
+  long numBytesUsed;
+  
+  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+    this.directory = directory;
+    this.parent = parent;
+    this.writer = parent.indexWriter;
+    this.infoStream = parent.indexWriter.getInfoStream();
+    this.docState = new DocState(this);
+    this.docState.similarity = parent.config.getSimilarity();
+    this.docState.maxFieldLength = parent.config.getMaxFieldLength();
+    
+    consumer = indexingChain.getChain(this);
+    if (consumer instanceof DocFieldProcessor) {
+      docFieldProcessor = (DocFieldProcessor) consumer;
+    }
+
+  }
+  
+  void setAborting() {
+    aborting = true;
+  }
+  
+  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
+    docState.doc = doc;
+    docState.analyzer = analyzer;
+    docState.docID = numDocsInRAM;
+    initSegmentName(false);
+  
+    final DocWriter perDoc;
+    
+    boolean success = false;
+    try {
+      perDoc = consumer.processDocument();
+      
+      success = true;
+    } finally {
+      if (!success) {
+        if (!aborting) {
+          // mark document as deleted
+          commitDocument(-1);
+        }
+      }
+    }
+
+    success = false;
+    try {
+      if (perDoc != null) {
+        perDoc.finish();
+      }
+      
+      success = true;
+    } finally {
+      if (!success) {
+        setAborting();
+      }
+    }
+
+  }
+
+  public void commitDocument(long sequenceID) {
+    if (numDocsInRAM == sequenceIDs.length) {
+      sequenceIDs = ArrayUtil.grow(sequenceIDs);
+    }
+    
+    sequenceIDs[numDocsInRAM] = sequenceID;
+    numDocsInRAM++;
+    numDocsInStore++;
+  }
+  
+  int getNumDocsInRAM() {
+    return numDocsInRAM;
+  }
+  
+  long getMinSequenceID() {
+    if (numDocsInRAM == 0) {
+      return -1;
+    }
+    return sequenceIDs[0];
+  }
+  
+  /** Returns true if any of the fields in the current
+  *  buffered docs have omitTermFreqAndPositions==false */
+  boolean hasProx() {
+    return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
+                                      : true;
+  }
+  
+  Codec getCodec() {
+    return flushState.codec;
+  }
+  
+  void initSegmentName(boolean onlyDocStore) {
+    if (segment == null && (!onlyDocStore || docStoreSegment == null)) {
+      // this call is synchronized on IndexWriter.segmentInfos
+      segment = writer.newSegmentName();
+      assert numDocsInRAM == 0;
+    }
+    if (docStoreSegment == null) {
+      docStoreSegment = segment;
+      assert numDocsInStore == 0;
+    }
+  }
+
+  
+  private void initFlushState(boolean onlyDocStore) {
+    initSegmentName(onlyDocStore);
+    flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
+                                       docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
+                                       writer.codecs);
+  }
+  
+  /** Reset after a flush */
+  private void doAfterFlush() throws IOException {
+    segment = null;
+    numDocsInRAM = 0;
+  }
+    
+  /** Flush all pending docs to a new segment */
+  SegmentInfo flush(boolean closeDocStore) throws IOException {
+    assert numDocsInRAM > 0;
+
+    initFlushState(closeDocStore);
+
+    docStoreOffset = numDocsInStore;
+
+    if (infoStream != null) {
+      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
+    }
+    
+    boolean success = false;
+
+    try {
+
+      if (closeDocStore) {
+        assert flushState.docStoreSegmentName != null;
+        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
+        closeDocStore();
+        flushState.numDocsInStore = 0;
+      }
+      
+      consumer.flush(flushState);
+
+      if (infoStream != null) {
+        SegmentInfo si = new SegmentInfo(flushState.segmentName,
+            flushState.numDocs,
+            directory, false,
+            docStoreOffset, flushState.docStoreSegmentName,
+            false,    
+            hasProx(),
+            getCodec());
+
+        final long newSegmentSize = si.sizeInBytes();
+        String message = "  ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
+          " newFlushedSize=" + newSegmentSize +
+          " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
+          " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
+        message(message);
+      }
+
+      flushedDocCount += flushState.numDocs;
+
+      long maxSequenceID = sequenceIDs[numDocsInRAM-1];
+      doAfterFlush();
+      
+      // Create new SegmentInfo, but do not add to our
+      // segmentInfos until deletes are flushed
+      // successfully.
+      SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
+                                   flushState.numDocs,
+                                   directory, false,
+                                   docStoreOffset, flushState.docStoreSegmentName,
+                                   false,    
+                                   hasProx(),
+                                   getCodec());
+
+      
+      newSegment.setMinSequenceID(sequenceIDs[0]);
+      newSegment.setMaxSequenceID(maxSequenceID);
+      
+      IndexWriter.setDiagnostics(newSegment, "flush");
+      success = true;
+
+      return newSegment;
+    } finally {
+      if (!success) {
+        setAborting();
+      }
+    }
+  }
+
+  /** Closes the current open doc stores an returns the doc
+   *  store segment name.  This returns null if there are *
+   *  no buffered documents. */
+  String closeDocStore() throws IOException {
+
+    // nocommit
+//    if (infoStream != null)
+//      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
+    
+    boolean success = false;
+
+    try {
+      initFlushState(true);
+      closedFiles.clear();
+
+      consumer.closeDocStore(flushState);
+      // nocommit
+      //assert 0 == openFiles.size();
+
+      String s = docStoreSegment;
+      docStoreSegment = null;
+      docStoreOffset = 0;
+      numDocsInStore = 0;
+      success = true;
+      return s;
+    } finally {
+      if (!success) {
+        parent.abort();
+      }
+    }
+  }
+
+  
+  /** Get current segment name we are writing. */
+  String getSegment() {
+    return segment;
+  }
+  
+  /** Returns the current doc store segment we are writing
+   *  to. */
+  String getDocStoreSegment() {
+    return docStoreSegment;
+  }
+
+  /** Returns the doc offset into the shared doc store for
+   *  the current buffered docs. */
+  int getDocStoreOffset() {
+    return docStoreOffset;
+  }
+
+
+  @SuppressWarnings("unchecked")
+  List<String> closedFiles() {
+    return (List<String>) ((ArrayList<String>) closedFiles).clone();
+  }
+
+  void addOpenFile(String name) {
+    synchronized(parent.openFiles) {
+      assert !parent.openFiles.contains(name);
+      parent.openFiles.add(name);
+    }
+  }
+
+  void removeOpenFile(String name) {
+    synchronized(parent.openFiles) {
+      assert parent.openFiles.contains(name);
+      parent.openFiles.remove(name);
+    }
+    closedFiles.add(name);
+  }
+  
+  /** Consumer returns this on each doc.  This holds any
+   *  state that must be flushed synchronized "in docID
+   *  order".  We gather these and flush them in order. */
+  abstract static class DocWriter {
+    DocWriter next;
+    int docID;
+    abstract void finish() throws IOException;
+    abstract void abort();
+    abstract long sizeInBytes();
+
+    void setNext(DocWriter next) {
+      this.next = next;
+    }
+  }
+
+  /**
+   * Create and return a new DocWriterBuffer.
+   */
+  PerDocBuffer newPerDocBuffer() {
+    return new PerDocBuffer();
+  }
+
+  /**
+   * RAMFile buffer for DocWriters.
+   */
+  class PerDocBuffer extends RAMFile {
+    
+    /**
+     * Allocate bytes used from shared pool.
+     */
+    protected byte[] newBuffer(int size) {
+      assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
+      return ramAllocator.perDocAllocator.getByteBlock();
+    }
+    
+    /**
+     * Recycle the bytes used.
+     */
+    synchronized void recycle() {
+      if (buffers.size() > 0) {
+        setLength(0);
+        
+        // Recycle the blocks
+        ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
+        buffers.clear();
+        sizeInBytes = 0;
+        
+        assert numBuffers() == 0;
+      }
+    }
+  }
+  
+  void bytesUsed(long numBytes) {
+    ramAllocator.bytesUsed(numBytes);
+  }
+  
+  void message(String message) {
+    if (infoStream != null)
+      writer.message("DW: " + message);
+  }
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,148 @@
+package org.apache.lucene.index;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.util.Constants;
+
+class DocumentsWriterRAMAllocator {
+  final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
+  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
+
+  
+  class ByteBlockAllocator extends ByteBlockPool.Allocator {
+    final int blockSize;
+
+    ByteBlockAllocator(int blockSize) {
+      this.blockSize = blockSize;
+    }
+
+    ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
+    
+    /* Allocate another byte[] from the shared pool */
+    @Override
+    byte[] getByteBlock() {
+      final int size = freeByteBlocks.size();
+      final byte[] b;
+      if (0 == size) {
+        b = new byte[blockSize];
+        // Always record a block allocated, even if
+        // trackAllocations is false.  This is necessary
+        // because this block will be shared between
+        // things that don't track allocations (term
+        // vectors) and things that do (freq/prox
+        // postings).
+        numBytesUsed += blockSize;
+      } else
+        b = freeByteBlocks.remove(size-1);
+      return b;
+    }
+
+    /* Return byte[]'s to the pool */
+    @Override
+    void recycleByteBlocks(byte[][] blocks, int start, int end) {
+      for(int i=start;i<end;i++) {
+        freeByteBlocks.add(blocks[i]);
+      }
+    }
+
+    @Override
+    void recycleByteBlocks(List<byte[]> blocks) {
+      final int size = blocks.size();
+      for(int i=0;i<size;i++) {
+        freeByteBlocks.add(blocks.get(i));
+      }
+    }
+  }
+
+  private ArrayList<int[]> freeIntBlocks = new ArrayList<int[]>();
+
+  /* Allocate another int[] from the shared pool */
+  int[] getIntBlock() {
+    final int size = freeIntBlocks.size();
+    final int[] b;
+    if (0 == size) {
+      b = new int[INT_BLOCK_SIZE];
+      // Always record a block allocated, even if
+      // trackAllocations is false.  This is necessary
+      // because this block will be shared between
+      // things that don't track allocations (term
+      // vectors) and things that do (freq/prox
+      // postings).
+      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
+    } else
+      b = freeIntBlocks.remove(size-1);
+    return b;
+  }
+
+  void bytesUsed(long numBytes) {
+    numBytesUsed += numBytes;
+  }
+
+  /* Return int[]s to the pool */
+  void recycleIntBlocks(int[][] blocks, int start, int end) {
+    for(int i=start;i<end;i++)
+      freeIntBlocks.add(blocks[i]);
+  }
+
+  long getRAMUsed() {
+    return numBytesUsed;
+  }
+
+  long numBytesUsed;
+
+  NumberFormat nf = NumberFormat.getInstance();
+
+  final static int PER_DOC_BLOCK_SIZE = 1024;
+  
+  // Coarse estimates used to measure RAM usage of buffered deletes
+  final static int OBJECT_HEADER_BYTES = 8;
+  final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
+  final static int INT_NUM_BYTE = 4;
+  final static int CHAR_NUM_BYTE = 2;
+
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/ Term
+     key, BufferedDeletes.Num val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
+     String field and String text (OBJ_HEADER + 2*POINTER).
+     We don't count Term's field since it's interned.
+     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+     OBJ_HEADER + string.length*CHAR).  BufferedDeletes.num is
+     OBJ_HEADER + INT. */
+ 
+  final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
+
+  /* Rough logic: del docIDs are List<Integer>.  Say list
+     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
+     + int */
+  final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
+
+  /* Rough logic: HashMap has an array[Entry] w/ varying
+     load factor (say 2 * POINTER).  Entry is object w/
+     Query key, Integer val, int hash, Entry next
+     (OBJ_HEADER + 3*POINTER + INT).  Query we often
+     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
+  final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
+
+  /* Initial chunks size of the shared byte[] blocks used to
+     store postings data */
+  final static int BYTE_BLOCK_SHIFT = 15;
+  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
+  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
+  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
+
+  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
+
+  /* Initial chunks size of the shared int[] blocks used to
+     store postings data */
+  final static int INT_BLOCK_SHIFT = 13;
+  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
+  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
+
+  String toMB(long v) {
+    return nf.format(v/1024./1024.);
+  }
+
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
@@ -0,0 +1,255 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+abstract class DocumentsWriterThreadPool {
+  public static abstract class Task<T> {
+    private boolean clearThreadBindings = false;
+    
+    protected void clearThreadBindings() {
+      this.clearThreadBindings = true;
+    }
+    
+    boolean doClearThreadBindings() {
+      return clearThreadBindings;
+    }
+  }
+
+  public static abstract class PerThreadTask<T> extends Task<T> {
+    abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
+  }
+  
+  public static abstract class AllThreadsTask<T> extends Task<T> {
+    abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
+  }
+
+  protected abstract static class ThreadState {
+    private DocumentsWriterPerThread perThread;
+    private boolean isIdle = true;
+    
+    void start() {/* extension hook */}
+    void finish() {/* extension hook */}
+  }
+  
+  private int pauseThreads = 0;
+  
+  protected final int maxNumThreadStates;
+  protected ThreadState[] allThreadStates = new ThreadState[0];
+  
+  private final Lock lock = new ReentrantLock();
+  private final Condition threadStateAvailable = lock.newCondition();
+  private boolean globalLock;
+  private boolean aborting;
+
+  DocumentsWriterThreadPool(int maxNumThreadStates) {
+    this.maxNumThreadStates = (maxNumThreadStates < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
+  }
+  
+  public final int getMaxThreadStates() {
+    return this.maxNumThreadStates;
+  }
+  
+  void pauseAllThreads() {
+    lock.lock();
+    try {
+      pauseThreads++;
+      while(!allThreadsIdle()) {
+        try {
+          threadStateAvailable.await();
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  void resumeAllThreads() {
+    lock.lock();
+    try {
+      pauseThreads--;
+      assert pauseThreads >= 0;
+      if (0 == pauseThreads) {
+        threadStateAvailable.signalAll();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private boolean allThreadsIdle() {
+    for (ThreadState state : allThreadStates) {
+      if (!state.isIdle) {
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  void abort() throws IOException {
+    pauseAllThreads();
+    aborting = true;
+    for (ThreadState state : allThreadStates) {
+      state.perThread.abort();
+    }
+  }
+  
+  void finishAbort() {
+    aborting = false;
+    resumeAllThreads();
+  }
+
+  public <T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
+    T result = null;
+    
+    lock.lock();
+    try {
+      try {
+        while (globalLock) {
+          threadStateAvailable.await();
+        }
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+      
+      globalLock = true;
+      pauseAllThreads();
+    } finally {
+      lock.unlock();
+    }
+
+    
+    // all threads are idle now
+    
+    try {
+      final ThreadState[] localAllThreads = allThreadStates;
+      
+      result = task.process(new Iterator<DocumentsWriterPerThread>() {
+        int i = 0;
+  
+        @Override
+        public boolean hasNext() {
+          return i < localAllThreads.length;
+        }
+  
+        @Override
+        public DocumentsWriterPerThread next() {
+          return localAllThreads[i++].perThread;
+        }
+  
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("remove() not supported.");
+        }
+      });
+      return result;
+    } finally {
+      lock.lock();
+      try {
+        try {
+          if (task.doClearThreadBindings()) {
+            clearAllThreadBindings();
+          }
+        } finally {
+          globalLock = false;
+          resumeAllThreads();
+          threadStateAvailable.signalAll();
+        }
+      } finally {
+        lock.unlock();
+      }
+      
+    }
+  }
+
+  
+  public final <T> T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T> task) throws IOException {
+    ThreadState state = acquireThreadState(documentsWriter, doc);
+    boolean success = false;
+    try {
+      T result = task.process(state.perThread);
+      success = true;
+      return result;
+    } finally {
+      boolean abort = false;
+      if (!success && state.perThread.aborting) {
+        state.perThread.aborting = false;
+        abort = true;
+      }
+
+      returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
+      
+      if (abort) {
+        documentsWriter.abort();
+      }
+    }
+  }
+  
+  protected final <T extends ThreadState> T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
+    // Just create a new "private" thread state
+    ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
+    if (allThreadStates.length > 0)
+      System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
+    threadState.perThread = documentsWriter.newDocumentsWriterPerThread(); 
+    newArray[allThreadStates.length] = threadState;
+
+    allThreadStates = newArray;
+    return threadState;
+  }
+  
+  protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
+  protected void clearThreadBindings(ThreadState flushedThread) {
+    // subclasses can optionally override this to cleanup after a thread flushed
+  }
+
+  protected void clearAllThreadBindings() {
+    // subclasses can optionally override this to cleanup after a thread flushed
+  }
+  
+  
+  private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
+    lock.lock();
+    try {
+      ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
+      
+      try {
+        while (!threadState.isIdle || globalLock || aborting) {
+          threadStateAvailable.await();
+        }
+      } catch (InterruptedException ie) {
+        throw new ThreadInterruptedException(ie);
+      }
+      
+      threadState.isIdle = false;
+      threadState.start();
+      
+      return threadState;
+      
+    } finally {
+      lock.unlock();
+    }
+  }
+  
+  private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
+    lock.lock();
+    try {
+      state.finish();
+      if (clearThreadBindings) {
+        clearThreadBindings(state);
+      }
+      state.isIdle = true;
+      threadStateAvailable.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+}



Re: svn commit: r966168 [1/4] - in /lucene/dev: branches/realtime_search/lucene/src/java/org/apache/lucene/index/ branches/realtime_search/lucene/src/java/org/apache/lucene/util/ branches/realtime_search/lucene/src/test/org/apache/lucene/index/ trunk/lucen...

Posted by Michael Busch <bu...@gmail.com>.
  Oups I messed something up.  Two files ended up in trunk (something 
went wrong with my svn switch) :(

Will try to fix it now...

  Michael

On 7/21/10 3:27 AM, buschmi@apache.org wrote:
> Author: buschmi
> Date: Wed Jul 21 10:27:20 2010
> New Revision: 966168
>
> URL: http://svn.apache.org/viewvc?rev=966168&view=rev
> Log:
> LUCENE-2324: Committing second version of the patch to the real-time branch.  It's not done yet, but easier to track progress using the branch.
>
> Added:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/util/ThreadSafeCloneableSortedMap.java
> Removed:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadState.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxFieldMergeState.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumerPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerThread.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumerPerThread.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/TermsHashPerThread.java
> Modified:
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ParallelPostingsArray.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriterPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashConsumer.java
>      lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestByteSlices.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestStressIndexing2.java
>      lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
>      lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesInRAM.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,70 @@
> +package org.apache.lucene.index;
> +
> +import java.util.TreeMap;
> +
> +import org.apache.lucene.search.Query;
> +import org.apache.lucene.util.ThreadSafeCloneableSortedMap;
> +
> +public class BufferedDeletesInRAM {
> +  static class Delete {
> +    int flushCount;
> +
> +    public Delete(int flushCount) {
> +      this.flushCount = flushCount;
> +    }
> +  }
> +
> +  final static class DeleteTerm extends Delete {
> +    final Term term;
> +
> +    public DeleteTerm(Term term, int flushCount) {
> +      super(flushCount);
> +      this.term = term;
> +    }
> +  }
> +
> +  final static class DeleteTerms extends Delete {
> +    final Term[] terms;
> +
> +    public DeleteTerms(Term[] terms, int flushCount) {
> +      super(flushCount);
> +      this.terms = terms;
> +    }
> +  }
> +
> +  final static class DeleteQuery extends Delete {
> +    final Query query;
> +
> +    public DeleteQuery(Query query, int flushCount) {
> +      super(flushCount);
> +      this.query = query;
> +    }
> +  }
> +
> +  final ThreadSafeCloneableSortedMap<Long, Delete>  deletes = ThreadSafeCloneableSortedMap
> +      .getThreadSafeSortedMap(new TreeMap<Long, Delete>());
> +
> +  final void addDeleteTerm(Term term, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteTerm(term, numThreadStates));
> +  }
> +
> +  final void addDeleteTerms(Term[] terms, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteTerms(terms, numThreadStates));
> +  }
> +
> +  final void addDeleteQuery(Query query, long sequenceID, int numThreadStates) {
> +    deletes.put(sequenceID, new DeleteQuery(query, numThreadStates));
> +  }
> +
> +  boolean hasDeletes() {
> +    return !deletes.isEmpty();
> +  }
> +
> +  void clear() {
> +    deletes.clear();
> +  }
> +
> +  int getNumDeletes() {
> +    return this.deletes.size();
> +  }
> +}
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteBlockPool.java Wed Jul 21 10:27:20 2010
> @@ -50,10 +50,10 @@ final class ByteBlockPool {
>     public byte[][] buffers = new byte[10][];
>
>     int bufferUpto = -1;                        // Which buffer we are upto
> -  public int byteUpto = DocumentsWriter.BYTE_BLOCK_SIZE;             // Where we are in head buffer
> +  public int byteUpto = DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;             // Where we are in head buffer
>
>     public byte[] buffer;                              // Current head buffer
> -  public int byteOffset = -DocumentsWriter.BYTE_BLOCK_SIZE;          // Current head offset
> +  public int byteOffset = -DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;          // Current head offset
>
>     private final Allocator allocator;
>
> @@ -95,11 +95,11 @@ final class ByteBlockPool {
>       bufferUpto++;
>
>       byteUpto = 0;
> -    byteOffset += DocumentsWriter.BYTE_BLOCK_SIZE;
> +    byteOffset += DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>     }
>
>     public int newSlice(final int size) {
> -    if (byteUpto>  DocumentsWriter.BYTE_BLOCK_SIZE-size)
> +    if (byteUpto>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-size)
>         nextBuffer();
>       final int upto = byteUpto;
>       byteUpto += size;
> @@ -123,7 +123,7 @@ final class ByteBlockPool {
>       final int newSize = levelSizeArray[newLevel];
>
>       // Maybe allocate another block
> -    if (byteUpto>  DocumentsWriter.BYTE_BLOCK_SIZE-newSize)
> +    if (byteUpto>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE-newSize)
>         nextBuffer();
>
>       final int newUpto = byteUpto;
> @@ -151,8 +151,8 @@ final class ByteBlockPool {
>     // Fill in a BytesRef from term's length&  bytes encoded in
>     // byte block
>     final BytesRef setBytesRef(BytesRef term, int textStart) {
> -    final byte[] bytes = term.bytes = buffers[textStart>>  DocumentsWriter.BYTE_BLOCK_SHIFT];
> -    int pos = textStart&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    final byte[] bytes = term.bytes = buffers[textStart>>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
> +    int pos = textStart&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       if ((bytes[pos]&  0x80) == 0) {
>         // length is 1 byte
>         term.length = bytes[pos];
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceReader.java Wed Jul 21 10:27:20 2010
> @@ -48,16 +48,16 @@ final class ByteSliceReader extends Data
>       this.endIndex = endIndex;
>
>       level = 0;
> -    bufferUpto = startIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> -    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> +    bufferUpto = startIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> +    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>       buffer = pool.buffers[bufferUpto];
> -    upto = startIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = startIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
>       final int firstSize = ByteBlockPool.levelSizeArray[0];
>
>       if (startIndex+firstSize>= endIndex) {
>         // There is only this one slice to read
> -      limit = endIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +      limit = endIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       } else
>         limit = upto+firstSize-4;
>     }
> @@ -102,11 +102,11 @@ final class ByteSliceReader extends Data
>       level = ByteBlockPool.nextLevelArray[level];
>       final int newSize = ByteBlockPool.levelSizeArray[level];
>
> -    bufferUpto = nextIndex / DocumentsWriter.BYTE_BLOCK_SIZE;
> -    bufferOffset = bufferUpto * DocumentsWriter.BYTE_BLOCK_SIZE;
> +    bufferUpto = nextIndex / DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
> +    bufferOffset = bufferUpto * DocumentsWriterRAMAllocator.BYTE_BLOCK_SIZE;
>
>       buffer = pool.buffers[bufferUpto];
> -    upto = nextIndex&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = nextIndex&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>
>       if (nextIndex + newSize>= endIndex) {
>         // We are advancing to the final slice
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ByteSliceWriter.java Wed Jul 21 10:27:20 2010
> @@ -42,9 +42,9 @@ final class ByteSliceWriter extends Data
>      * Set up the writer to write at address.
>      */
>     public void init(int address) {
> -    slice = pool.buffers[address>>  DocumentsWriter.BYTE_BLOCK_SHIFT];
> +    slice = pool.buffers[address>>  DocumentsWriterRAMAllocator.BYTE_BLOCK_SHIFT];
>       assert slice != null;
> -    upto = address&  DocumentsWriter.BYTE_BLOCK_MASK;
> +    upto = address&  DocumentsWriterRAMAllocator.BYTE_BLOCK_MASK;
>       offset0 = address;
>       assert upto<  slice.length;
>     }
> @@ -80,6 +80,6 @@ final class ByteSliceWriter extends Data
>     }
>
>     public int getAddress() {
> -    return upto + (offset0&  DocumentsWriter.BYTE_BLOCK_NOT_MASK);
> +    return upto + (offset0&  DocumentsWriterRAMAllocator.BYTE_BLOCK_NOT_MASK);
>     }
>   }
> \ No newline at end of file
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,11 +18,10 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>
>   abstract class DocConsumer {
> -  abstract DocConsumerPerThread addThread(DocumentsWriterThreadState perThread) throws IOException;
> -  abstract void flush(final Collection<DocConsumerPerThread>  threads, final SegmentWriteState state) throws IOException;
> +  abstract DocumentsWriterPerThread.DocWriter processDocument() throws IOException;
> +  abstract void flush(final SegmentWriteState state) throws IOException;
>     abstract void closeDocStore(final SegmentWriteState state) throws IOException;
>     abstract void abort();
>     abstract boolean freeRAM();
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Wed Jul 21 10:27:20 2010
> @@ -18,7 +18,6 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>   import java.util.Map;
>
>   abstract class DocFieldConsumer {
> @@ -27,7 +26,7 @@ abstract class DocFieldConsumer {
>
>     /** Called when DocumentsWriter decides to create a new
>      *  segment */
> -  abstract void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException;
> +  abstract void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException;
>
>     /** Called when DocumentsWriter decides to close the doc
>      *  stores */
> @@ -36,14 +35,17 @@ abstract class DocFieldConsumer {
>     /** Called when an aborting exception is hit */
>     abstract void abort();
>
> -  /** Add a new thread */
> -  abstract DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException;
> -
>     /** Called when DocumentsWriter is using too much RAM.
>      *  The consumer should free RAM, if possible, returning
>      *  true if any RAM was in fact freed. */
>     abstract boolean freeRAM();
> +
> +  abstract void startDocument() throws IOException;
>
> +  abstract DocFieldConsumerPerField addField(FieldInfo fi);
> +
> +  abstract DocumentsWriterPerThread.DocWriter finishDocument() throws IOException;
> +
>     void setFieldInfos(FieldInfos fieldInfos) {
>       this.fieldInfos = fieldInfos;
>     }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumerPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,4 +24,5 @@ abstract class DocFieldConsumerPerField
>     /** Processes all occurrences of a single field */
>     abstract void processFields(Fieldable[] fields, int count) throws IOException;
>     abstract void abort();
> +  abstract FieldInfo getFieldInfo();
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Wed Jul 21 10:27:20 2010
> @@ -17,12 +17,9 @@ package org.apache.lucene.index;
>    * limitations under the License.
>    */
>
> +import java.io.IOException;
>   import java.util.HashMap;
> -import java.util.Collection;
> -import java.util.Iterator;
>   import java.util.Map;
> -import java.util.HashSet;
> -import java.io.IOException;
>
>   import org.apache.lucene.util.ArrayUtil;
>   import org.apache.lucene.util.RamUsageEstimator;
> @@ -33,10 +30,12 @@ import org.apache.lucene.util.RamUsageEs
>   final class DocFieldConsumers extends DocFieldConsumer {
>     final DocFieldConsumer one;
>     final DocFieldConsumer two;
> +  final DocumentsWriterPerThread.DocState docState;
>
> -  public DocFieldConsumers(DocFieldConsumer one, DocFieldConsumer two) {
> +  public DocFieldConsumers(DocFieldProcessor processor, DocFieldConsumer one, DocFieldConsumer two) {
>       this.one = one;
>       this.two = two;
> +    this.docState = processor.docState;
>     }
>
>     @Override
> @@ -47,33 +46,19 @@ final class DocFieldConsumers extends Do
>     }
>
>     @Override
> -  public void flush(Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException {
> -
> -    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  oneThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> -    Map<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  twoThreadsAndFields = new HashMap<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>();
> -
> -    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  entry : threadsAndFields.entrySet()) {
> +  public void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException {
>
> -      final DocFieldConsumersPerThread perThread = (DocFieldConsumersPerThread) entry.getKey();
> +    Map<FieldInfo, DocFieldConsumerPerField>  oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> +    Map<FieldInfo, DocFieldConsumerPerField>  twoFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
>
> -      final Collection<DocFieldConsumerPerField>  fields = entry.getValue();
> -
> -      Iterator<DocFieldConsumerPerField>  fieldsIt = fields.iterator();
> -      Collection<DocFieldConsumerPerField>  oneFields = new HashSet<DocFieldConsumerPerField>();
> -      Collection<DocFieldConsumerPerField>  twoFields = new HashSet<DocFieldConsumerPerField>();
> -      while(fieldsIt.hasNext()) {
> -        DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldsIt.next();
> -        oneFields.add(perField.one);
> -        twoFields.add(perField.two);
> -      }
> -
> -      oneThreadsAndFields.put(perThread.one, oneFields);
> -      twoThreadsAndFields.put(perThread.two, twoFields);
> +    for (Map.Entry<FieldInfo, DocFieldConsumerPerField>  fieldToFlush : fieldsToFlush.entrySet()) {
> +      DocFieldConsumersPerField perField = (DocFieldConsumersPerField) fieldToFlush.getValue();
> +      oneFieldsToFlush.put(fieldToFlush.getKey(), perField.one);
> +      twoFieldsToFlush.put(fieldToFlush.getKey(), perField.two);
>       }
> -
>
> -    one.flush(oneThreadsAndFields, state);
> -    two.flush(twoThreadsAndFields, state);
> +    one.flush(oneFieldsToFlush, state);
> +    two.flush(twoFieldsToFlush, state);
>     }
>
>     @Override
> @@ -101,16 +86,11 @@ final class DocFieldConsumers extends Do
>       return any;
>     }
>
> -  @Override
> -  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) throws IOException {
> -    return new DocFieldConsumersPerThread(docFieldProcessorPerThread, this, one.addThread(docFieldProcessorPerThread), two.addThread(docFieldProcessorPerThread));
> -  }
> -
>     PerDoc[] docFreeList = new PerDoc[1];
>     int freeCount;
>     int allocCount;
>
> -  synchronized PerDoc getPerDoc() {
> +  PerDoc getPerDoc() {
>       if (freeCount == 0) {
>         allocCount++;
>         if (allocCount>  docFreeList.length) {
> @@ -125,15 +105,15 @@ final class DocFieldConsumers extends Do
>         return docFreeList[--freeCount];
>     }
>
> -  synchronized void freePerDoc(PerDoc perDoc) {
> +  void freePerDoc(PerDoc perDoc) {
>       assert freeCount<  docFreeList.length;
>       docFreeList[freeCount++] = perDoc;
>     }
>
> -  class PerDoc extends DocumentsWriter.DocWriter {
> +  class PerDoc extends DocumentsWriterPerThread.DocWriter {
>
> -    DocumentsWriter.DocWriter writerOne;
> -    DocumentsWriter.DocWriter writerTwo;
> +    DocumentsWriterPerThread.DocWriter writerOne;
> +    DocumentsWriterPerThread.DocWriter writerTwo;
>
>       @Override
>       public long sizeInBytes() {
> @@ -166,4 +146,35 @@ final class DocFieldConsumers extends Do
>         }
>       }
>     }
> +
> +  @Override
> +  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> +    final DocumentsWriterPerThread.DocWriter oneDoc = one.finishDocument();
> +    final DocumentsWriterPerThread.DocWriter twoDoc = two.finishDocument();
> +    if (oneDoc == null)
> +      return twoDoc;
> +    else if (twoDoc == null)
> +      return oneDoc;
> +    else {
> +      DocFieldConsumers.PerDoc both = getPerDoc();
> +      both.docID = docState.docID;
> +      assert oneDoc.docID == docState.docID;
> +      assert twoDoc.docID == docState.docID;
> +      both.writerOne = oneDoc;
> +      both.writerTwo = twoDoc;
> +      return both;
> +    }
> +  }
> +
> +  @Override
> +  public void startDocument() throws IOException {
> +    one.startDocument();
> +    two.startDocument();
> +  }
> +
> +  @Override
> +  public DocFieldConsumerPerField addField(FieldInfo fi) {
> +    return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
> +  }
> +
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumersPerField.java Wed Jul 21 10:27:20 2010
> @@ -24,12 +24,14 @@ final class DocFieldConsumersPerField ex
>
>     final DocFieldConsumerPerField one;
>     final DocFieldConsumerPerField two;
> -  final DocFieldConsumersPerThread perThread;
> +  final DocFieldConsumers parent;
> +  final FieldInfo fieldInfo;
>
> -  public DocFieldConsumersPerField(DocFieldConsumersPerThread perThread, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> -    this.perThread = perThread;
> +  public DocFieldConsumersPerField(DocFieldConsumers parent, FieldInfo fi, DocFieldConsumerPerField one, DocFieldConsumerPerField two) {
> +    this.parent = parent;
>       this.one = one;
>       this.two = two;
> +    this.fieldInfo = fi;
>     }
>
>     @Override
> @@ -46,4 +48,9 @@ final class DocFieldConsumersPerField ex
>         two.abort();
>       }
>     }
> +
> +  @Override
> +  FieldInfo getFieldInfo() {
> +    return fieldInfo;
> +  }
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Wed Jul 21 10:27:20 2010
> @@ -19,8 +19,15 @@ package org.apache.lucene.index;
>
>   import java.io.IOException;
>   import java.util.Collection;
> -import java.util.Map;
>   import java.util.HashMap;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Map;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.document.Fieldable;
> +import org.apache.lucene.util.ArrayUtil;
> +import org.apache.lucene.util.RamUsageEstimator;
>
>
>   /**
> @@ -33,13 +40,27 @@ import java.util.HashMap;
>
>   final class DocFieldProcessor extends DocConsumer {
>
> -  final DocumentsWriter docWriter;
>     final FieldInfos fieldInfos = new FieldInfos();
>     final DocFieldConsumer consumer;
>     final StoredFieldsWriter fieldsWriter;
>
> -  public DocFieldProcessor(DocumentsWriter docWriter, DocFieldConsumer consumer) {
> -    this.docWriter = docWriter;
> +  // Holds all fields seen in current doc
> +  DocFieldProcessorPerField[] fields = new DocFieldProcessorPerField[1];
> +  int fieldCount;
> +
> +  // Hash table for all fields ever seen
> +  DocFieldProcessorPerField[] fieldHash = new DocFieldProcessorPerField[2];
> +  int hashMask = 1;
> +  int totalFieldCount;
> +
> +
> +  float docBoost;
> +  int fieldGen;
> +  final DocumentsWriterPerThread.DocState docState;
> +
> +
> +  public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
> +    this.docState = docWriter.docState;
>       this.consumer = consumer;
>       consumer.setFieldInfos(fieldInfos);
>       fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
> @@ -52,16 +73,17 @@ final class DocFieldProcessor extends Do
>     }
>
>     @Override
> -  public void flush(Collection<DocConsumerPerThread>  threads, SegmentWriteState state) throws IOException {
> +  public void flush(SegmentWriteState state) throws IOException {
>
> -    Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>  childThreadsAndFields = new HashMap<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>();
> -    for ( DocConsumerPerThread thread : threads) {
> -      DocFieldProcessorPerThread perThread = (DocFieldProcessorPerThread) thread;
> -      childThreadsAndFields.put(perThread.consumer, perThread.fields());
> -      perThread.trimFields(state);
> +    Map<FieldInfo, DocFieldConsumerPerField>  childFields = new HashMap<FieldInfo, DocFieldConsumerPerField>();
> +    Collection<DocFieldConsumerPerField>  fields = fields();
> +    for (DocFieldConsumerPerField f : fields) {
> +      childFields.put(f.getFieldInfo(), f);
>       }
> +    trimFields(state);
> +
>       fieldsWriter.flush(state);
> -    consumer.flush(childThreadsAndFields, state);
> +    consumer.flush(childFields, state);
>
>       // Important to save after asking consumer to flush so
>       // consumer can alter the FieldInfo* if necessary.  EG,
> @@ -74,6 +96,15 @@ final class DocFieldProcessor extends Do
>
>     @Override
>     public void abort() {
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField field = fieldHash[i];
> +      while(field != null) {
> +        final DocFieldProcessorPerField next = field.next;
> +        field.abort();
> +        field = next;
> +      }
> +    }
> +
>       fieldsWriter.abort();
>       consumer.abort();
>     }
> @@ -82,9 +113,317 @@ final class DocFieldProcessor extends Do
>     public boolean freeRAM() {
>       return consumer.freeRAM();
>     }
> +
> +  public Collection<DocFieldConsumerPerField>  fields() {
> +    Collection<DocFieldConsumerPerField>  fields = new HashSet<DocFieldConsumerPerField>();
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField field = fieldHash[i];
> +      while(field != null) {
> +        fields.add(field.consumer);
> +        field = field.next;
> +      }
> +    }
> +    assert fields.size() == totalFieldCount;
> +    return fields;
> +  }
> +
> +  /** If there are fields we've seen but did not see again
> +   *  in the last run, then free them up. */
> +
> +  void trimFields(SegmentWriteState state) {
> +
> +    for(int i=0;i<fieldHash.length;i++) {
> +      DocFieldProcessorPerField perField = fieldHash[i];
> +      DocFieldProcessorPerField lastPerField = null;
> +
> +      while (perField != null) {
> +
> +        if (perField.lastGen == -1) {
> +
> +          // This field was not seen since the previous
> +          // flush, so, free up its resources now
> +
> +          // Unhash
> +          if (lastPerField == null)
> +            fieldHash[i] = perField.next;
> +          else
> +            lastPerField.next = perField.next;
> +
> +          if (state.infoStream != null) {
> +            state.infoStream.println("  purge field=" + perField.fieldInfo.name);
> +          }
> +
> +          totalFieldCount--;
> +
> +        } else {
> +          // Reset
> +          perField.lastGen = -1;
> +          lastPerField = perField;
> +        }
> +
> +        perField = perField.next;
> +      }
> +    }
> +  }
> +
> +  private void rehash() {
> +    final int newHashSize = (fieldHash.length*2);
> +    assert newHashSize>  fieldHash.length;
> +
> +    final DocFieldProcessorPerField newHashArray[] = new DocFieldProcessorPerField[newHashSize];
> +
> +    // Rehash
> +    int newHashMask = newHashSize-1;
> +    for(int j=0;j<fieldHash.length;j++) {
> +      DocFieldProcessorPerField fp0 = fieldHash[j];
> +      while(fp0 != null) {
> +        final int hashPos2 = fp0.fieldInfo.name.hashCode()&  newHashMask;
> +        DocFieldProcessorPerField nextFP0 = fp0.next;
> +        fp0.next = newHashArray[hashPos2];
> +        newHashArray[hashPos2] = fp0;
> +        fp0 = nextFP0;
> +      }
> +    }
> +
> +    fieldHash = newHashArray;
> +    hashMask = newHashMask;
> +  }
>
>     @Override
> -  public DocConsumerPerThread addThread(DocumentsWriterThreadState threadState) throws IOException {
> -    return new DocFieldProcessorPerThread(threadState, this);
> +  public DocumentsWriterPerThread.DocWriter processDocument() throws IOException {
> +
> +    consumer.startDocument();
> +    fieldsWriter.startDocument();
> +
> +    final Document doc = docState.doc;
> +
> +    fieldCount = 0;
> +
> +    final int thisFieldGen = fieldGen++;
> +
> +    final List<Fieldable>  docFields = doc.getFields();
> +    final int numDocFields = docFields.size();
> +
> +    // Absorb any new fields first seen in this document.
> +    // Also absorb any changes to fields we had already
> +    // seen before (eg suddenly turning on norms or
> +    // vectors, etc.):
> +
> +    for(int i=0;i<numDocFields;i++) {
> +      Fieldable field = docFields.get(i);
> +      final String fieldName = field.name();
> +
> +      // Make sure we have a PerField allocated
> +      final int hashPos = fieldName.hashCode()&  hashMask;
> +      DocFieldProcessorPerField fp = fieldHash[hashPos];
> +      while(fp != null&&  !fp.fieldInfo.name.equals(fieldName)) {
> +        fp = fp.next;
> +      }
> +
> +      if (fp == null) {
> +
> +        // TODO FI: we need to genericize the "flags" that a
> +        // field holds, and, how these flags are merged; it
> +        // needs to be more "pluggable" such that if I want
> +        // to have a new "thing" my Fields can do, I can
> +        // easily add it
> +        FieldInfo fi = fieldInfos.add(fieldName, field.isIndexed(), field.isTermVectorStored(),
> +                                      field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> +                                      field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> +
> +        fp = new DocFieldProcessorPerField(this, fi);
> +        fp.next = fieldHash[hashPos];
> +        fieldHash[hashPos] = fp;
> +        totalFieldCount++;
> +
> +        if (totalFieldCount>= fieldHash.length/2)
> +          rehash();
> +      } else {
> +        fp.fieldInfo.update(field.isIndexed(), field.isTermVectorStored(),
> +                            field.isStorePositionWithTermVector(), field.isStoreOffsetWithTermVector(),
> +                            field.getOmitNorms(), false, field.getOmitTermFreqAndPositions());
> +      }
> +
> +      if (thisFieldGen != fp.lastGen) {
> +
> +        // First time we're seeing this field for this doc
> +        fp.fieldCount = 0;
> +
> +        if (fieldCount == fields.length) {
> +          final int newSize = fields.length*2;
> +          DocFieldProcessorPerField newArray[] = new DocFieldProcessorPerField[newSize];
> +          System.arraycopy(fields, 0, newArray, 0, fieldCount);
> +          fields = newArray;
> +        }
> +
> +        fields[fieldCount++] = fp;
> +        fp.lastGen = thisFieldGen;
> +      }
> +
> +      if (fp.fieldCount == fp.fields.length) {
> +        Fieldable[] newArray = new Fieldable[fp.fields.length*2];
> +        System.arraycopy(fp.fields, 0, newArray, 0, fp.fieldCount);
> +        fp.fields = newArray;
> +      }
> +
> +      fp.fields[fp.fieldCount++] = field;
> +      if (field.isStored()) {
> +        fieldsWriter.addField(field, fp.fieldInfo);
> +      }
> +    }
> +
> +    // If we are writing vectors then we must visit
> +    // fields in sorted order so they are written in
> +    // sorted order.  TODO: we actually only need to
> +    // sort the subset of fields that have vectors
> +    // enabled; we could save [small amount of] CPU
> +    // here.
> +    quickSort(fields, 0, fieldCount-1);
> +
> +    for(int i=0;i<fieldCount;i++)
> +      fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount);
> +
> +    if (docState.maxTermPrefix != null&&  docState.infoStream != null) {
> +      docState.infoStream.println("WARNING: document contains at least one immense term (whose UTF8 encoding is longer than the max length " + DocumentsWriterRAMAllocator.MAX_TERM_LENGTH_UTF8 + "), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is: '" + docState.maxTermPrefix + "...'");
> +      docState.maxTermPrefix = null;
> +    }
> +
> +    final DocumentsWriterPerThread.DocWriter one = fieldsWriter.finishDocument();
> +    final DocumentsWriterPerThread.DocWriter two = consumer.finishDocument();
> +    if (one == null) {
> +      return two;
> +    } else if (two == null) {
> +      return one;
> +    } else {
> +      PerDoc both = getPerDoc();
> +      both.docID = docState.docID;
> +      assert one.docID == docState.docID;
> +      assert two.docID == docState.docID;
> +      both.one = one;
> +      both.two = two;
> +      return both;
> +    }
> +  }
> +
> +  void quickSort(DocFieldProcessorPerField[] array, int lo, int hi) {
> +    if (lo>= hi)
> +      return;
> +    else if (hi == 1+lo) {
> +      if (array[lo].fieldInfo.name.compareTo(array[hi].fieldInfo.name)>  0) {
> +        final DocFieldProcessorPerField tmp = array[lo];
> +        array[lo] = array[hi];
> +        array[hi] = tmp;
> +      }
> +      return;
> +    }
> +
> +    int mid = (lo + hi)>>>  1;
> +
> +    if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)>  0) {
> +      DocFieldProcessorPerField tmp = array[lo];
> +      array[lo] = array[mid];
> +      array[mid] = tmp;
> +    }
> +
> +    if (array[mid].fieldInfo.name.compareTo(array[hi].fieldInfo.name)>  0) {
> +      DocFieldProcessorPerField tmp = array[mid];
> +      array[mid] = array[hi];
> +      array[hi] = tmp;
> +
> +      if (array[lo].fieldInfo.name.compareTo(array[mid].fieldInfo.name)>  0) {
> +        DocFieldProcessorPerField tmp2 = array[lo];
> +        array[lo] = array[mid];
> +        array[mid] = tmp2;
> +      }
> +    }
> +
> +    int left = lo + 1;
> +    int right = hi - 1;
> +
> +    if (left>= right)
> +      return;
> +
> +    DocFieldProcessorPerField partition = array[mid];
> +
> +    for (; ;) {
> +      while (array[right].fieldInfo.name.compareTo(partition.fieldInfo.name)>  0)
> +        --right;
> +
> +      while (left<  right&&  array[left].fieldInfo.name.compareTo(partition.fieldInfo.name)<= 0)
> +        ++left;
> +
> +      if (left<  right) {
> +        DocFieldProcessorPerField tmp = array[left];
> +        array[left] = array[right];
> +        array[right] = tmp;
> +        --right;
> +      } else {
> +        break;
> +      }
> +    }
> +
> +    quickSort(array, lo, left);
> +    quickSort(array, left + 1, hi);
> +  }
> +
> +  PerDoc[] docFreeList = new PerDoc[1];
> +  int freeCount;
> +  int allocCount;
> +
> +  PerDoc getPerDoc() {
> +    if (freeCount == 0) {
> +      allocCount++;
> +      if (allocCount>  docFreeList.length) {
> +        // Grow our free list up front to make sure we have
> +        // enough space to recycle all outstanding PerDoc
> +        // instances
> +        assert allocCount == 1+docFreeList.length;
> +        docFreeList = new PerDoc[ArrayUtil.oversize(allocCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
> +      }
> +      return new PerDoc();
> +    } else
> +      return docFreeList[--freeCount];
> +  }
> +
> +  void freePerDoc(PerDoc perDoc) {
> +    assert freeCount<  docFreeList.length;
> +    docFreeList[freeCount++] = perDoc;
> +  }
> +
> +  class PerDoc extends DocumentsWriterPerThread.DocWriter {
> +
> +    DocumentsWriterPerThread.DocWriter one;
> +    DocumentsWriterPerThread.DocWriter two;
> +
> +    @Override
> +    public long sizeInBytes() {
> +      return one.sizeInBytes() + two.sizeInBytes();
> +    }
> +
> +    @Override
> +    public void finish() throws IOException {
> +      try {
> +        try {
> +          one.finish();
> +        } finally {
> +          two.finish();
> +        }
> +      } finally {
> +        freePerDoc(this);
> +      }
> +    }
> +
> +    @Override
> +    public void abort() {
> +      try {
> +        try {
> +          one.abort();
> +        } finally {
> +          two.abort();
> +        }
> +      } finally {
> +        freePerDoc(this);
> +      }
> +    }
>     }
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessorPerField.java Wed Jul 21 10:27:20 2010
> @@ -34,8 +34,8 @@ final class DocFieldProcessorPerField {
>     int fieldCount;
>     Fieldable[] fields = new Fieldable[1];
>
> -  public DocFieldProcessorPerField(final DocFieldProcessorPerThread perThread, final FieldInfo fieldInfo) {
> -    this.consumer = perThread.consumer.addField(fieldInfo);
> +  public DocFieldProcessorPerField(final DocFieldProcessor docFieldProcessor, final FieldInfo fieldInfo) {
> +    this.consumer = docFieldProcessor.consumer.addField(fieldInfo);
>       this.fieldInfo = fieldInfo;
>     }
>
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Wed Jul 21 10:27:20 2010
> @@ -18,12 +18,13 @@ package org.apache.lucene.index;
>    */
>
>   import java.io.IOException;
> -import java.util.Collection;
>   import java.util.HashMap;
> -import java.util.HashSet;
> -
>   import java.util.Map;
>
> +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
> +import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
> +import org.apache.lucene.util.AttributeSource;
> +
>
>   /** This is a DocFieldConsumer that inverts each field,
>    *  separately, from a Document, and accepts a
> @@ -34,7 +35,32 @@ final class DocInverter extends DocField
>     final InvertedDocConsumer consumer;
>     final InvertedDocEndConsumer endConsumer;
>
> -  public DocInverter(InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> +  final DocumentsWriterPerThread.DocState docState;
> +
> +  final FieldInvertState fieldState = new FieldInvertState();
> +
> +  final SingleTokenAttributeSource singleToken = new SingleTokenAttributeSource();
> +
> +  static class SingleTokenAttributeSource extends AttributeSource {
> +    final CharTermAttribute termAttribute;
> +    final OffsetAttribute offsetAttribute;
> +
> +    private SingleTokenAttributeSource() {
> +      termAttribute = addAttribute(CharTermAttribute.class);
> +      offsetAttribute = addAttribute(OffsetAttribute.class);
> +    }
> +
> +    public void reinit(String stringValue, int startOffset,  int endOffset) {
> +      termAttribute.setEmpty().append(stringValue);
> +      offsetAttribute.setOffset(startOffset, endOffset);
> +    }
> +  }
> +
> +  // Used to read a string value for a field
> +  final ReusableStringReader stringReader = new ReusableStringReader();
> +
> +  public DocInverter(DocumentsWriterPerThread.DocState docState, InvertedDocConsumer consumer, InvertedDocEndConsumer endConsumer) {
> +    this.docState = docState;
>       this.consumer = consumer;
>       this.endConsumer = endConsumer;
>     }
> @@ -47,33 +73,37 @@ final class DocInverter extends DocField
>     }
>
>     @Override
> -  void flush(Map<DocFieldConsumerPerThread, Collection<DocFieldConsumerPerField>>  threadsAndFields, SegmentWriteState state) throws IOException {
> -
> -    Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>  childThreadsAndFields = new HashMap<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>>();
> -    Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>  endChildThreadsAndFields = new HashMap<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>>();
> -
> -    for (Map.Entry<DocFieldConsumerPerThread,Collection<DocFieldConsumerPerField>>  entry : threadsAndFields.entrySet() ) {
> +  void flush(Map<FieldInfo, DocFieldConsumerPerField>  fieldsToFlush, SegmentWriteState state) throws IOException {
>
> +    Map<FieldInfo, InvertedDocConsumerPerField>  childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
> +    Map<FieldInfo, InvertedDocEndConsumerPerField>  endChildFieldsToFlush = new HashMap<FieldInfo, InvertedDocEndConsumerPerField>();
>
> -      DocInverterPerThread perThread = (DocInverterPerThread) entry.getKey();
> -
> -      Collection<InvertedDocConsumerPerField>  childFields = new HashSet<InvertedDocConsumerPerField>();
> -      Collection<InvertedDocEndConsumerPerField>  endChildFields = new HashSet<InvertedDocEndConsumerPerField>();
> -      for (final DocFieldConsumerPerField field: entry.getValue() ) {
> -        DocInverterPerField perField = (DocInverterPerField) field;
> -        childFields.add(perField.consumer);
> -        endChildFields.add(perField.endConsumer);
> -      }
> -
> -      childThreadsAndFields.put(perThread.consumer, childFields);
> -      endChildThreadsAndFields.put(perThread.endConsumer, endChildFields);
> +    for (Map.Entry<FieldInfo, DocFieldConsumerPerField>  fieldToFlush : fieldsToFlush.entrySet()) {
> +      DocInverterPerField perField = (DocInverterPerField) fieldToFlush.getValue();
> +      childFieldsToFlush.put(fieldToFlush.getKey(), perField.consumer);
> +      endChildFieldsToFlush.put(fieldToFlush.getKey(), perField.endConsumer);
>       }
>
> -    consumer.flush(childThreadsAndFields, state);
> -    endConsumer.flush(endChildThreadsAndFields, state);
> +    consumer.flush(childFieldsToFlush, state);
> +    endConsumer.flush(endChildFieldsToFlush, state);
> +  }
> +
> +  @Override
> +  public void startDocument() throws IOException {
> +    consumer.startDocument();
> +    endConsumer.startDocument();
>     }
>
>     @Override
> +  public DocumentsWriterPerThread.DocWriter finishDocument() throws IOException {
> +    // TODO: allow endConsumer.finishDocument to also return
> +    // a DocWriter
> +    endConsumer.finishDocument();
> +    return consumer.finishDocument();
> +  }
> +
> +
> +  @Override
>     public void closeDocStore(SegmentWriteState state) throws IOException {
>       consumer.closeDocStore(state);
>       endConsumer.closeDocStore(state);
> @@ -81,17 +111,21 @@ final class DocInverter extends DocField
>
>     @Override
>     void abort() {
> -    consumer.abort();
> -    endConsumer.abort();
> +    try {
> +      consumer.abort();
> +    } finally {
> +      endConsumer.abort();
> +    }
>     }
>
>     @Override
>     public boolean freeRAM() {
>       return consumer.freeRAM();
>     }
> -
> +
>     @Override
> -  public DocFieldConsumerPerThread addThread(DocFieldProcessorPerThread docFieldProcessorPerThread) {
> -    return new DocInverterPerThread(docFieldProcessorPerThread, this);
> +  public DocFieldConsumerPerField addField(FieldInfo fi) {
> +    return new DocInverterPerField(this, fi);
>     }
> +
>   }
>
> Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java?rev=966168&r1=966167&r2=966168&view=diff
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java (original)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverterPerField.java Wed Jul 21 10:27:20 2010
> @@ -35,20 +35,20 @@ import org.apache.lucene.analysis.tokena
>
>   final class DocInverterPerField extends DocFieldConsumerPerField {
>
> -  final private DocInverterPerThread perThread;
> -  final private FieldInfo fieldInfo;
> +  final private DocInverter parent;
> +  final FieldInfo fieldInfo;
>     final InvertedDocConsumerPerField consumer;
>     final InvertedDocEndConsumerPerField endConsumer;
> -  final DocumentsWriter.DocState docState;
> +  final DocumentsWriterPerThread.DocState docState;
>     final FieldInvertState fieldState;
>
> -  public DocInverterPerField(DocInverterPerThread perThread, FieldInfo fieldInfo) {
> -    this.perThread = perThread;
> +  public DocInverterPerField(DocInverter parent, FieldInfo fieldInfo) {
> +    this.parent = parent;
>       this.fieldInfo = fieldInfo;
> -    docState = perThread.docState;
> -    fieldState = perThread.fieldState;
> -    this.consumer = perThread.consumer.addField(this, fieldInfo);
> -    this.endConsumer = perThread.endConsumer.addField(this, fieldInfo);
> +    docState = parent.docState;
> +    fieldState = parent.fieldState;
> +    this.consumer = parent.consumer.addField(this, fieldInfo);
> +    this.endConsumer = parent.endConsumer.addField(this, fieldInfo);
>     }
>
>     @Override
> @@ -84,8 +84,8 @@ final class DocInverterPerField extends
>           if (!field.isTokenized()) {		  // un-tokenized field
>             String stringValue = field.stringValue();
>             final int valueLength = stringValue.length();
> -          perThread.singleToken.reinit(stringValue, 0, valueLength);
> -          fieldState.attributeSource = perThread.singleToken;
> +          parent.singleToken.reinit(stringValue, 0, valueLength);
> +          fieldState.attributeSource = parent.singleToken;
>             consumer.start(field);
>
>             boolean success = false;
> @@ -93,8 +93,9 @@ final class DocInverterPerField extends
>               consumer.add();
>               success = true;
>             } finally {
> -            if (!success)
> +            if (!success) {
>                 docState.docWriter.setAborting();
> +            }
>             }
>             fieldState.offset += valueLength;
>             fieldState.length++;
> @@ -119,8 +120,8 @@ final class DocInverterPerField extends
>                 if (stringValue == null) {
>                   throw new IllegalArgumentException("field must have either TokenStream, String or Reader value");
>                 }
> -              perThread.stringReader.init(stringValue);
> -              reader = perThread.stringReader;
> +              parent.stringReader.init(stringValue);
> +              reader = parent.stringReader;
>               }
>
>               // Tokenize field and add to postingTable
> @@ -173,8 +174,9 @@ final class DocInverterPerField extends
>                   consumer.add();
>                   success = true;
>                 } finally {
> -                if (!success)
> +                if (!success) {
>                     docState.docWriter.setAborting();
> +                }
>                 }
>                 fieldState.position++;
>                 if (++fieldState.length>= maxFieldLength) {
> @@ -208,4 +210,9 @@ final class DocInverterPerField extends
>       consumer.finish();
>       endConsumer.finish();
>     }
> +
> +  @Override
> +  FieldInfo getFieldInfo() {
> +    return this.fieldInfo;
> +  }
>   }
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,459 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.io.PrintStream;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.analysis.Analyzer;
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.index.codecs.Codec;
> +import org.apache.lucene.search.Similarity;
> +import org.apache.lucene.store.Directory;
> +import org.apache.lucene.store.RAMFile;
> +import org.apache.lucene.util.ArrayUtil;
> +
> +public class DocumentsWriterPerThread {
> +
> +  /**
> +   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
> +   * which returns the DocConsumer that the DocumentsWriter calls to process the
> +   * documents.
> +   */
> +  abstract static class IndexingChain {
> +    abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
> +  }
> +
> +
> +  static final IndexingChain defaultIndexingChain = new IndexingChain() {
> +
> +    @Override
> +    DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) {
> +      /*
> +      This is the current indexing chain:
> +
> +      DocConsumer / DocConsumerPerThread
> +        -->  code: DocFieldProcessor / DocFieldProcessorPerThread
> +          -->  DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
> +            -->  code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
> +              -->  code: DocInverter / DocInverterPerThread / DocInverterPerField
> +                -->  InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> +                  -->  code: TermsHash / TermsHashPerThread / TermsHashPerField
> +                    -->  TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
> +                      -->  code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
> +                      -->  code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
> +                -->  InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
> +                  -->  code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
> +              -->  code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
> +    */
> +
> +    // Build up indexing chain:
> +
> +      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
> +      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
> +
> +      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
> +                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
> +      final NormsWriter normsWriter = new NormsWriter();
> +      final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
> +      return new DocFieldProcessor(documentsWriterPerThread, docInverter);
> +    }
> +  };
> +
> +  static class DocState {
> +    final DocumentsWriterPerThread docWriter;
> +    Analyzer analyzer;
> +    int maxFieldLength;
> +    PrintStream infoStream;
> +    Similarity similarity;
> +    int docID;
> +    Document doc;
> +    String maxTermPrefix;
> +
> +    DocState(DocumentsWriterPerThread docWriter) {
> +      this.docWriter = docWriter;
> +    }
> +
> +    // Only called by asserts
> +    public boolean testPoint(String name) {
> +      return docWriter.writer.testPoint(name);
> +    }
> +  }
> +
> +  /** Called if we hit an exception at a bad time (when
> +   *  updating the index files) and must discard all
> +   *  currently buffered docs.  This resets our state,
> +   *  discarding any docs added since last flush. */
> +  void abort() throws IOException {
> +    try {
> +      if (infoStream != null) {
> +        message("docWriter: now abort");
> +      }
> +      try {
> +        consumer.abort();
> +      } catch (Throwable t) {
> +      }
> +
> +      docStoreSegment = null;
> +      numDocsInStore = 0;
> +      docStoreOffset = 0;
> +
> +      // Reset all postings data
> +      doAfterFlush();
> +
> +    } finally {
> +      aborting = false;
> +      if (infoStream != null) {
> +        message("docWriter: done abort");
> +      }
> +    }
> +  }
> +
> +
> +  final DocumentsWriterRAMAllocator ramAllocator = new DocumentsWriterRAMAllocator();
> +
> +  final DocumentsWriter parent;
> +  final IndexWriter writer;
> +
> +  final Directory directory;
> +  final DocState docState;
> +  final DocConsumer consumer;
> +  private DocFieldProcessor docFieldProcessor;
> +
> +  String segment;                         // Current segment we are working on
> +  private String docStoreSegment;         // Current doc-store segment we are writing
> +  private int docStoreOffset;                     // Current starting doc-store offset of current segment
> +  boolean aborting;               // True if an abort is pending
> +
> +  private final PrintStream infoStream;
> +  private int numDocsInRAM;
> +  private int numDocsInStore;
> +  private int flushedDocCount;
> +  SegmentWriteState flushState;
> +
> +  long[] sequenceIDs = new long[8];
> +
> +  final List<String>  closedFiles = new ArrayList<String>();
> +
> +  long numBytesUsed;
> +
> +  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
> +    this.directory = directory;
> +    this.parent = parent;
> +    this.writer = parent.indexWriter;
> +    this.infoStream = parent.indexWriter.getInfoStream();
> +    this.docState = new DocState(this);
> +    this.docState.similarity = parent.config.getSimilarity();
> +    this.docState.maxFieldLength = parent.config.getMaxFieldLength();
> +
> +    consumer = indexingChain.getChain(this);
> +    if (consumer instanceof DocFieldProcessor) {
> +      docFieldProcessor = (DocFieldProcessor) consumer;
> +    }
> +
> +  }
> +
> +  void setAborting() {
> +    aborting = true;
> +  }
> +
> +  public void addDocument(Document doc, Analyzer analyzer) throws IOException {
> +    docState.doc = doc;
> +    docState.analyzer = analyzer;
> +    docState.docID = numDocsInRAM;
> +    initSegmentName(false);
> +
> +    final DocWriter perDoc;
> +
> +    boolean success = false;
> +    try {
> +      perDoc = consumer.processDocument();
> +
> +      success = true;
> +    } finally {
> +      if (!success) {
> +        if (!aborting) {
> +          // mark document as deleted
> +          commitDocument(-1);
> +        }
> +      }
> +    }
> +
> +    success = false;
> +    try {
> +      if (perDoc != null) {
> +        perDoc.finish();
> +      }
> +
> +      success = true;
> +    } finally {
> +      if (!success) {
> +        setAborting();
> +      }
> +    }
> +
> +  }
> +
> +  public void commitDocument(long sequenceID) {
> +    if (numDocsInRAM == sequenceIDs.length) {
> +      sequenceIDs = ArrayUtil.grow(sequenceIDs);
> +    }
> +
> +    sequenceIDs[numDocsInRAM] = sequenceID;
> +    numDocsInRAM++;
> +    numDocsInStore++;
> +  }
> +
> +  int getNumDocsInRAM() {
> +    return numDocsInRAM;
> +  }
> +
> +  long getMinSequenceID() {
> +    if (numDocsInRAM == 0) {
> +      return -1;
> +    }
> +    return sequenceIDs[0];
> +  }
> +
> +  /** Returns true if any of the fields in the current
> +  *  buffered docs have omitTermFreqAndPositions==false */
> +  boolean hasProx() {
> +    return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
> +                                      : true;
> +  }
> +
> +  Codec getCodec() {
> +    return flushState.codec;
> +  }
> +
> +  void initSegmentName(boolean onlyDocStore) {
> +    if (segment == null&&  (!onlyDocStore || docStoreSegment == null)) {
> +      // this call is synchronized on IndexWriter.segmentInfos
> +      segment = writer.newSegmentName();
> +      assert numDocsInRAM == 0;
> +    }
> +    if (docStoreSegment == null) {
> +      docStoreSegment = segment;
> +      assert numDocsInStore == 0;
> +    }
> +  }
> +
> +
> +  private void initFlushState(boolean onlyDocStore) {
> +    initSegmentName(onlyDocStore);
> +    flushState = new SegmentWriteState(infoStream, directory, segment, docFieldProcessor.fieldInfos,
> +                                       docStoreSegment, numDocsInRAM, numDocsInStore, writer.getConfig().getTermIndexInterval(),
> +                                       writer.codecs);
> +  }
> +
> +  /** Reset after a flush */
> +  private void doAfterFlush() throws IOException {
> +    segment = null;
> +    numDocsInRAM = 0;
> +  }
> +
> +  /** Flush all pending docs to a new segment */
> +  SegmentInfo flush(boolean closeDocStore) throws IOException {
> +    assert numDocsInRAM>  0;
> +
> +    initFlushState(closeDocStore);
> +
> +    docStoreOffset = numDocsInStore;
> +
> +    if (infoStream != null) {
> +      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
> +    }
> +
> +    boolean success = false;
> +
> +    try {
> +
> +      if (closeDocStore) {
> +        assert flushState.docStoreSegmentName != null;
> +        assert flushState.docStoreSegmentName.equals(flushState.segmentName);
> +        closeDocStore();
> +        flushState.numDocsInStore = 0;
> +      }
> +
> +      consumer.flush(flushState);
> +
> +      if (infoStream != null) {
> +        SegmentInfo si = new SegmentInfo(flushState.segmentName,
> +            flushState.numDocs,
> +            directory, false,
> +            docStoreOffset, flushState.docStoreSegmentName,
> +            false,
> +            hasProx(),
> +            getCodec());
> +
> +        final long newSegmentSize = si.sizeInBytes();
> +        String message = "  ramUsed=" + ramAllocator.nf.format(((double) numBytesUsed)/1024./1024.) + " MB" +
> +          " newFlushedSize=" + newSegmentSize +
> +          " docs/MB=" + ramAllocator.nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +
> +          " new/old=" + ramAllocator.nf.format(100.0*newSegmentSize/numBytesUsed) + "%";
> +        message(message);
> +      }
> +
> +      flushedDocCount += flushState.numDocs;
> +
> +      long maxSequenceID = sequenceIDs[numDocsInRAM-1];
> +      doAfterFlush();
> +
> +      // Create new SegmentInfo, but do not add to our
> +      // segmentInfos until deletes are flushed
> +      // successfully.
> +      SegmentInfo newSegment = new SegmentInfo(flushState.segmentName,
> +                                   flushState.numDocs,
> +                                   directory, false,
> +                                   docStoreOffset, flushState.docStoreSegmentName,
> +                                   false,
> +                                   hasProx(),
> +                                   getCodec());
> +
> +
> +      newSegment.setMinSequenceID(sequenceIDs[0]);
> +      newSegment.setMaxSequenceID(maxSequenceID);
> +
> +      IndexWriter.setDiagnostics(newSegment, "flush");
> +      success = true;
> +
> +      return newSegment;
> +    } finally {
> +      if (!success) {
> +        setAborting();
> +      }
> +    }
> +  }
> +
> +  /** Closes the current open doc stores an returns the doc
> +   *  store segment name.  This returns null if there are *
> +   *  no buffered documents. */
> +  String closeDocStore() throws IOException {
> +
> +    // nocommit
> +//    if (infoStream != null)
> +//      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);
> +
> +    boolean success = false;
> +
> +    try {
> +      initFlushState(true);
> +      closedFiles.clear();
> +
> +      consumer.closeDocStore(flushState);
> +      // nocommit
> +      //assert 0 == openFiles.size();
> +
> +      String s = docStoreSegment;
> +      docStoreSegment = null;
> +      docStoreOffset = 0;
> +      numDocsInStore = 0;
> +      success = true;
> +      return s;
> +    } finally {
> +      if (!success) {
> +        parent.abort();
> +      }
> +    }
> +  }
> +
> +
> +  /** Get current segment name we are writing. */
> +  String getSegment() {
> +    return segment;
> +  }
> +
> +  /** Returns the current doc store segment we are writing
> +   *  to. */
> +  String getDocStoreSegment() {
> +    return docStoreSegment;
> +  }
> +
> +  /** Returns the doc offset into the shared doc store for
> +   *  the current buffered docs. */
> +  int getDocStoreOffset() {
> +    return docStoreOffset;
> +  }
> +
> +
> +  @SuppressWarnings("unchecked")
> +  List<String>  closedFiles() {
> +    return (List<String>) ((ArrayList<String>) closedFiles).clone();
> +  }
> +
> +  void addOpenFile(String name) {
> +    synchronized(parent.openFiles) {
> +      assert !parent.openFiles.contains(name);
> +      parent.openFiles.add(name);
> +    }
> +  }
> +
> +  void removeOpenFile(String name) {
> +    synchronized(parent.openFiles) {
> +      assert parent.openFiles.contains(name);
> +      parent.openFiles.remove(name);
> +    }
> +    closedFiles.add(name);
> +  }
> +
> +  /** Consumer returns this on each doc.  This holds any
> +   *  state that must be flushed synchronized "in docID
> +   *  order".  We gather these and flush them in order. */
> +  abstract static class DocWriter {
> +    DocWriter next;
> +    int docID;
> +    abstract void finish() throws IOException;
> +    abstract void abort();
> +    abstract long sizeInBytes();
> +
> +    void setNext(DocWriter next) {
> +      this.next = next;
> +    }
> +  }
> +
> +  /**
> +   * Create and return a new DocWriterBuffer.
> +   */
> +  PerDocBuffer newPerDocBuffer() {
> +    return new PerDocBuffer();
> +  }
> +
> +  /**
> +   * RAMFile buffer for DocWriters.
> +   */
> +  class PerDocBuffer extends RAMFile {
> +
> +    /**
> +     * Allocate bytes used from shared pool.
> +     */
> +    protected byte[] newBuffer(int size) {
> +      assert size == DocumentsWriterRAMAllocator.PER_DOC_BLOCK_SIZE;
> +      return ramAllocator.perDocAllocator.getByteBlock();
> +    }
> +
> +    /**
> +     * Recycle the bytes used.
> +     */
> +    synchronized void recycle() {
> +      if (buffers.size()>  0) {
> +        setLength(0);
> +
> +        // Recycle the blocks
> +        ramAllocator.perDocAllocator.recycleByteBlocks(buffers);
> +        buffers.clear();
> +        sizeInBytes = 0;
> +
> +        assert numBuffers() == 0;
> +      }
> +    }
> +  }
> +
> +  void bytesUsed(long numBytes) {
> +    ramAllocator.bytesUsed(numBytes);
> +  }
> +
> +  void message(String message) {
> +    if (infoStream != null)
> +      writer.message("DW: " + message);
> +  }
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterRAMAllocator.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,148 @@
> +package org.apache.lucene.index;
> +
> +import java.text.NumberFormat;
> +import java.util.ArrayList;
> +import java.util.List;
> +
> +import org.apache.lucene.util.Constants;
> +
> +class DocumentsWriterRAMAllocator {
> +  final ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
> +  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
> +
> +
> +  class ByteBlockAllocator extends ByteBlockPool.Allocator {
> +    final int blockSize;
> +
> +    ByteBlockAllocator(int blockSize) {
> +      this.blockSize = blockSize;
> +    }
> +
> +    ArrayList<byte[]>  freeByteBlocks = new ArrayList<byte[]>();
> +
> +    /* Allocate another byte[] from the shared pool */
> +    @Override
> +    byte[] getByteBlock() {
> +      final int size = freeByteBlocks.size();
> +      final byte[] b;
> +      if (0 == size) {
> +        b = new byte[blockSize];
> +        // Always record a block allocated, even if
> +        // trackAllocations is false.  This is necessary
> +        // because this block will be shared between
> +        // things that don't track allocations (term
> +        // vectors) and things that do (freq/prox
> +        // postings).
> +        numBytesUsed += blockSize;
> +      } else
> +        b = freeByteBlocks.remove(size-1);
> +      return b;
> +    }
> +
> +    /* Return byte[]'s to the pool */
> +    @Override
> +    void recycleByteBlocks(byte[][] blocks, int start, int end) {
> +      for(int i=start;i<end;i++) {
> +        freeByteBlocks.add(blocks[i]);
> +      }
> +    }
> +
> +    @Override
> +    void recycleByteBlocks(List<byte[]>  blocks) {
> +      final int size = blocks.size();
> +      for(int i=0;i<size;i++) {
> +        freeByteBlocks.add(blocks.get(i));
> +      }
> +    }
> +  }
> +
> +  private ArrayList<int[]>  freeIntBlocks = new ArrayList<int[]>();
> +
> +  /* Allocate another int[] from the shared pool */
> +  int[] getIntBlock() {
> +    final int size = freeIntBlocks.size();
> +    final int[] b;
> +    if (0 == size) {
> +      b = new int[INT_BLOCK_SIZE];
> +      // Always record a block allocated, even if
> +      // trackAllocations is false.  This is necessary
> +      // because this block will be shared between
> +      // things that don't track allocations (term
> +      // vectors) and things that do (freq/prox
> +      // postings).
> +      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;
> +    } else
> +      b = freeIntBlocks.remove(size-1);
> +    return b;
> +  }
> +
> +  void bytesUsed(long numBytes) {
> +    numBytesUsed += numBytes;
> +  }
> +
> +  /* Return int[]s to the pool */
> +  void recycleIntBlocks(int[][] blocks, int start, int end) {
> +    for(int i=start;i<end;i++)
> +      freeIntBlocks.add(blocks[i]);
> +  }
> +
> +  long getRAMUsed() {
> +    return numBytesUsed;
> +  }
> +
> +  long numBytesUsed;
> +
> +  NumberFormat nf = NumberFormat.getInstance();
> +
> +  final static int PER_DOC_BLOCK_SIZE = 1024;
> +
> +  // Coarse estimates used to measure RAM usage of buffered deletes
> +  final static int OBJECT_HEADER_BYTES = 8;
> +  final static int POINTER_NUM_BYTE = Constants.JRE_IS_64BIT ? 8 : 4;
> +  final static int INT_NUM_BYTE = 4;
> +  final static int CHAR_NUM_BYTE = 2;
> +
> +  /* Rough logic: HashMap has an array[Entry] w/ varying
> +     load factor (say 2 * POINTER).  Entry is object w/ Term
> +     key, BufferedDeletes.Num val, int hash, Entry next
> +     (OBJ_HEADER + 3*POINTER + INT).  Term is object w/
> +     String field and String text (OBJ_HEADER + 2*POINTER).
> +     We don't count Term's field since it's interned.
> +     Term's text is String (OBJ_HEADER + 4*INT + POINTER +
> +     OBJ_HEADER + string.length*CHAR).  BufferedDeletes.num is
> +     OBJ_HEADER + INT. */
> +
> +  final static int BYTES_PER_DEL_TERM = 8*POINTER_NUM_BYTE + 5*OBJECT_HEADER_BYTES + 6*INT_NUM_BYTE;
> +
> +  /* Rough logic: del docIDs are List<Integer>.  Say list
> +     allocates ~2X size (2*POINTER).  Integer is OBJ_HEADER
> +     + int */
> +  final static int BYTES_PER_DEL_DOCID = 2*POINTER_NUM_BYTE + OBJECT_HEADER_BYTES + INT_NUM_BYTE;
> +
> +  /* Rough logic: HashMap has an array[Entry] w/ varying
> +     load factor (say 2 * POINTER).  Entry is object w/
> +     Query key, Integer val, int hash, Entry next
> +     (OBJ_HEADER + 3*POINTER + INT).  Query we often
> +     undercount (say 24 bytes).  Integer is OBJ_HEADER + INT. */
> +  final static int BYTES_PER_DEL_QUERY = 5*POINTER_NUM_BYTE + 2*OBJECT_HEADER_BYTES + 2*INT_NUM_BYTE + 24;
> +
> +  /* Initial chunks size of the shared byte[] blocks used to
> +     store postings data */
> +  final static int BYTE_BLOCK_SHIFT = 15;
> +  final static int BYTE_BLOCK_SIZE = 1<<  BYTE_BLOCK_SHIFT;
> +  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
> +  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
> +
> +  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
> +
> +  /* Initial chunks size of the shared int[] blocks used to
> +     store postings data */
> +  final static int INT_BLOCK_SHIFT = 13;
> +  final static int INT_BLOCK_SIZE = 1<<  INT_BLOCK_SHIFT;
> +  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
> +
> +  String toMB(long v) {
> +    return nf.format(v/1024./1024.);
> +  }
> +
> +}
>
> Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
> URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=966168&view=auto
> ==============================================================================
> --- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (added)
> +++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Wed Jul 21 10:27:20 2010
> @@ -0,0 +1,255 @@
> +package org.apache.lucene.index;
> +
> +import java.io.IOException;
> +import java.util.Iterator;
> +import java.util.concurrent.locks.Condition;
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +
> +import org.apache.lucene.document.Document;
> +import org.apache.lucene.util.ThreadInterruptedException;
> +
> +abstract class DocumentsWriterThreadPool {
> +  public static abstract class Task<T>  {
> +    private boolean clearThreadBindings = false;
> +
> +    protected void clearThreadBindings() {
> +      this.clearThreadBindings = true;
> +    }
> +
> +    boolean doClearThreadBindings() {
> +      return clearThreadBindings;
> +    }
> +  }
> +
> +  public static abstract class PerThreadTask<T>  extends Task<T>  {
> +    abstract T process(final DocumentsWriterPerThread perThread) throws IOException;
> +  }
> +
> +  public static abstract class AllThreadsTask<T>  extends Task<T>  {
> +    abstract T process(final Iterator<DocumentsWriterPerThread>  threadsIterator) throws IOException;
> +  }
> +
> +  protected abstract static class ThreadState {
> +    private DocumentsWriterPerThread perThread;
> +    private boolean isIdle = true;
> +
> +    void start() {/* extension hook */}
> +    void finish() {/* extension hook */}
> +  }
> +
> +  private int pauseThreads = 0;
> +
> +  protected final int maxNumThreadStates;
> +  protected ThreadState[] allThreadStates = new ThreadState[0];
> +
> +  private final Lock lock = new ReentrantLock();
> +  private final Condition threadStateAvailable = lock.newCondition();
> +  private boolean globalLock;
> +  private boolean aborting;
> +
> +  DocumentsWriterThreadPool(int maxNumThreadStates) {
> +    this.maxNumThreadStates = (maxNumThreadStates<  1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumThreadStates;
> +  }
> +
> +  public final int getMaxThreadStates() {
> +    return this.maxNumThreadStates;
> +  }
> +
> +  void pauseAllThreads() {
> +    lock.lock();
> +    try {
> +      pauseThreads++;
> +      while(!allThreadsIdle()) {
> +        try {
> +          threadStateAvailable.await();
> +        } catch (InterruptedException ie) {
> +          throw new ThreadInterruptedException(ie);
> +        }
> +      }
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  void resumeAllThreads() {
> +    lock.lock();
> +    try {
> +      pauseThreads--;
> +      assert pauseThreads>= 0;
> +      if (0 == pauseThreads) {
> +        threadStateAvailable.signalAll();
> +      }
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  private boolean allThreadsIdle() {
> +    for (ThreadState state : allThreadStates) {
> +      if (!state.isIdle) {
> +        return false;
> +      }
> +    }
> +
> +    return true;
> +  }
> +
> +  void abort() throws IOException {
> +    pauseAllThreads();
> +    aborting = true;
> +    for (ThreadState state : allThreadStates) {
> +      state.perThread.abort();
> +    }
> +  }
> +
> +  void finishAbort() {
> +    aborting = false;
> +    resumeAllThreads();
> +  }
> +
> +  public<T>  T executeAllThreads(AllThreadsTask<T>  task) throws IOException {
> +    T result = null;
> +
> +    lock.lock();
> +    try {
> +      try {
> +        while (globalLock) {
> +          threadStateAvailable.await();
> +        }
> +      } catch (InterruptedException ie) {
> +        throw new ThreadInterruptedException(ie);
> +      }
> +
> +      globalLock = true;
> +      pauseAllThreads();
> +    } finally {
> +      lock.unlock();
> +    }
> +
> +
> +    // all threads are idle now
> +
> +    try {
> +      final ThreadState[] localAllThreads = allThreadStates;
> +
> +      result = task.process(new Iterator<DocumentsWriterPerThread>() {
> +        int i = 0;
> +
> +        @Override
> +        public boolean hasNext() {
> +          return i<  localAllThreads.length;
> +        }
> +
> +        @Override
> +        public DocumentsWriterPerThread next() {
> +          return localAllThreads[i++].perThread;
> +        }
> +
> +        @Override
> +        public void remove() {
> +          throw new UnsupportedOperationException("remove() not supported.");
> +        }
> +      });
> +      return result;
> +    } finally {
> +      lock.lock();
> +      try {
> +        try {
> +          if (task.doClearThreadBindings()) {
> +            clearAllThreadBindings();
> +          }
> +        } finally {
> +          globalLock = false;
> +          resumeAllThreads();
> +          threadStateAvailable.signalAll();
> +        }
> +      } finally {
> +        lock.unlock();
> +      }
> +
> +    }
> +  }
> +
> +
> +  public final<T>  T executePerThread(DocumentsWriter documentsWriter, Document doc, PerThreadTask<T>  task) throws IOException {
> +    ThreadState state = acquireThreadState(documentsWriter, doc);
> +    boolean success = false;
> +    try {
> +      T result = task.process(state.perThread);
> +      success = true;
> +      return result;
> +    } finally {
> +      boolean abort = false;
> +      if (!success&&  state.perThread.aborting) {
> +        state.perThread.aborting = false;
> +        abort = true;
> +      }
> +
> +      returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
> +
> +      if (abort) {
> +        documentsWriter.abort();
> +      }
> +    }
> +  }
> +
> +  protected final<T extends ThreadState>  T addNewThreadState(DocumentsWriter documentsWriter, T threadState) {
> +    // Just create a new "private" thread state
> +    ThreadState[] newArray = new ThreadState[1+allThreadStates.length];
> +    if (allThreadStates.length>  0)
> +      System.arraycopy(allThreadStates, 0, newArray, 0, allThreadStates.length);
> +    threadState.perThread = documentsWriter.newDocumentsWriterPerThread();
> +    newArray[allThreadStates.length] = threadState;
> +
> +    allThreadStates = newArray;
> +    return threadState;
> +  }
> +
> +  protected abstract ThreadState selectThreadState(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
> +  protected void clearThreadBindings(ThreadState flushedThread) {
> +    // subclasses can optionally override this to cleanup after a thread flushed
> +  }
> +
> +  protected void clearAllThreadBindings() {
> +    // subclasses can optionally override this to cleanup after a thread flushed
> +  }
> +
> +
> +  private final ThreadState acquireThreadState(DocumentsWriter documentsWriter, Document doc) {
> +    lock.lock();
> +    try {
> +      ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
> +
> +      try {
> +        while (!threadState.isIdle || globalLock || aborting) {
> +          threadStateAvailable.await();
> +        }
> +      } catch (InterruptedException ie) {
> +        throw new ThreadInterruptedException(ie);
> +      }
> +
> +      threadState.isIdle = false;
> +      threadState.start();
> +
> +      return threadState;
> +
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +
> +  private final void returnDocumentsWriterPerThread(ThreadState state, boolean clearThreadBindings) {
> +    lock.lock();
> +    try {
> +      state.finish();
> +      if (clearThreadBindings) {
> +        clearThreadBindings(state);
> +      }
> +      state.isIdle = true;
> +      threadStateAvailable.signalAll();
> +    } finally {
> +      lock.unlock();
> +    }
> +  }
> +}
>
>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org