You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC

svn commit: r1098427 [2/5] - in /lucene/dev/trunk: ./ lucene/ lucene/src/java/org/apache/lucene/index/ lucene/src/test-framework/org/apache/lucene/search/ lucene/src/test-framework/org/apache/lucene/store/ lucene/src/test-framework/org/apache/lucene/ut...

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Sun May  1 22:38:33 2011
@@ -19,36 +19,27 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.text.NumberFormat;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMFile;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.RamUsageEstimator;
-import org.apache.lucene.util.RecyclingByteBlockAllocator;
-import org.apache.lucene.util.ThreadInterruptedException;
-
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
-import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
 
 /**
  * This class accepts multiple added documents and directly
- * writes a single segment file.  It does this more
- * efficiently than creating a single segment per document
- * (with DocumentWriter) and doing standard merges on those
- * segments.
+ * writes segment files.
  *
  * Each added document is passed to the {@link DocConsumer},
  * which in turn processes the document and interacts with
@@ -111,266 +102,117 @@ import static org.apache.lucene.util.Byt
  */
 
 final class DocumentsWriter {
-  final AtomicLong bytesUsed = new AtomicLong(0);
-  IndexWriter writer;
   Directory directory;
 
-  String segment;                         // Current segment we are working on
-
-  private int nextDocID;                  // Next docID to be added
-  private int numDocs;                    // # of docs added, but not yet flushed
-
-  // Max # ThreadState instances; if there are more threads
-  // than this they share ThreadStates
-  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
-  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap<Thread,DocumentsWriterThreadState>();
-
-  boolean bufferIsFull;                   // True when it's time to write segment
-  private boolean aborting;               // True if an abort is pending
+  private volatile boolean closed;
 
   PrintStream infoStream;
   SimilarityProvider similarityProvider;
 
-  // max # simultaneous threads; if there are more than
-  // this, they wait for others to finish first
-  private final int maxThreadStates;
-
-  // TODO: cutover to BytesRefHash
-  // Deletes for our still-in-RAM (to be flushed next) segment
-  private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
-  
-  static class DocState {
-    DocumentsWriter docWriter;
-    Analyzer analyzer;
-    PrintStream infoStream;
-    SimilarityProvider similarityProvider;
-    int docID;
-    Document doc;
-    String maxTermPrefix;
-
-    // Only called by asserts
-    public boolean testPoint(String name) {
-      return docWriter.writer.testPoint(name);
-    }
-
-    public void clear() {
-      // don't hold onto doc nor analyzer, in case it is
-      // largish:
-      doc = null;
-      analyzer = null;
-    }
-  }
-
-  /** Consumer returns this on each doc.  This holds any
-   *  state that must be flushed synchronized "in docID
-   *  order".  We gather these and flush them in order. */
-  abstract static class DocWriter {
-    DocWriter next;
-    int docID;
-    abstract void finish() throws IOException;
-    abstract void abort();
-    abstract long sizeInBytes();
+  List<String> newFiles;
 
-    void setNext(DocWriter next) {
-      this.next = next;
-    }
-  }
+  final IndexWriter indexWriter;
 
-  /**
-   * Create and return a new DocWriterBuffer.
-   */
-  PerDocBuffer newPerDocBuffer() {
-    return new PerDocBuffer();
-  }
-
-  /**
-   * RAMFile buffer for DocWriters.
-   */
-  class PerDocBuffer extends RAMFile {
-    
-    /**
-     * Allocate bytes used from shared pool.
-     */
-    @Override
-    protected byte[] newBuffer(int size) {
-      assert size == PER_DOC_BLOCK_SIZE;
-      return perDocAllocator.getByteBlock();
-    }
-    
-    /**
-     * Recycle the bytes used.
-     */
-    synchronized void recycle() {
-      if (buffers.size() > 0) {
-        setLength(0);
-        
-        // Recycle the blocks
-        perDocAllocator.recycleByteBlocks(buffers);
-        buffers.clear();
-        sizeInBytes = 0;
-        
-        assert numBuffers() == 0;
-      }
-    }
-  }
-  
-  /**
-   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
-   * which returns the DocConsumer that the DocumentsWriter calls to process the
-   * documents. 
-   */
-  abstract static class IndexingChain {
-    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
-  }
-  
-  static final IndexingChain defaultIndexingChain = new IndexingChain() {
+  private AtomicInteger numDocsInRAM = new AtomicInteger(0);
 
-    @Override
-    DocConsumer getChain(DocumentsWriter documentsWriter) {
-      /*
-      This is the current indexing chain:
-
-      DocConsumer / DocConsumerPerThread
-        --> code: DocFieldProcessor / DocFieldProcessorPerThread
-          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
-            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
-              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
-                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
-                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
-                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
-                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
-                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
-                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
-              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
-    */
-
-    // Build up indexing chain:
-
-      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
-      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
-      /*
-       * nesting TermsHash instances here to allow the secondary (TermVectors) share the interned postings
-       * via a shared ByteBlockPool. See TermsHashPerField for details. 
-       */
-      final TermsHash termVectorsTermHash = new TermsHash(documentsWriter, false, termVectorsWriter, null);
-      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter, termVectorsTermHash);
-      final NormsWriter normsWriter = new NormsWriter();
-      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
-      return new DocFieldProcessor(documentsWriter, docInverter);
-    }
-  };
-
-  final DocConsumer consumer;
-
-  // How much RAM we can use before flushing.  This is 0 if
-  // we are flushing by doc count instead.
-
-  private final IndexWriterConfig config;
+  // TODO: cut over to BytesRefHash in BufferedDeletes
+  volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
+  private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
 
-  private boolean closed;
-  private FieldInfos fieldInfos;
+  private Collection<String> abortedFiles;               // List of files that were written before last abort()
 
-  private final BufferedDeletesStream bufferedDeletesStream;
-  private final IndexWriter.FlushControl flushControl;
+  final IndexingChain chain;
 
-  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, IndexingChain indexingChain, FieldInfos fieldInfos,
+  final DocumentsWriterPerThreadPool perThreadPool;
+  final FlushPolicy flushPolicy;
+  final DocumentsWriterFlushControl flushControl;
+  final Healthiness healthiness;
+  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
       BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
-    this.writer = writer;
+    this.indexWriter = writer;
     this.similarityProvider = config.getSimilarityProvider();
-    this.maxThreadStates = config.getMaxThreadStates();
-    this.fieldInfos = fieldInfos;
-    this.bufferedDeletesStream = bufferedDeletesStream;
-    flushControl = writer.flushControl;
-    consumer = config.getIndexingChain().getChain(this);
-    this.config = config;
-  }
-
-  // Buffer a specific docID for deletion.  Currently only
-  // used when we hit a exception when adding a document
-  synchronized void deleteDocID(int docIDUpto) {
-    pendingDeletes.addDocID(docIDUpto);
-    // NOTE: we do not trigger flush here.  This is
-    // potentially a RAM leak, if you have an app that tries
-    // to add docs but every single doc always hits a
-    // non-aborting exception.  Allowing a flush here gets
-    // very messy because we are only invoked when handling
-    // exceptions so to do this properly, while handling an
-    // exception we'd have to go off and flush new deletes
-    // which is risky (likely would hit some other
-    // confounding exception).
-  }
-  
-  boolean deleteQueries(Query... queries) {
-    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
-    synchronized(this) {
-      for (Query query : queries) {
-        pendingDeletes.addQuery(query, numDocs);
-      }
-    }
-    return doFlush;
-  }
-  
-  boolean deleteQuery(Query query) { 
-    final boolean doFlush = flushControl.waitUpdate(0, 1);
-    synchronized(this) {
-      pendingDeletes.addQuery(query, numDocs);
+    this.perThreadPool = config.getIndexerThreadPool();
+    this.chain = config.getIndexingChain();
+    this.perThreadPool.initialize(this, globalFieldNumbers, config);
+    final FlushPolicy configuredPolicy = config.getFlushPolicy();
+    if (configuredPolicy == null) {
+      flushPolicy = new FlushByRamOrCountsPolicy();
+    } else {
+      flushPolicy = configuredPolicy;
     }
-    return doFlush;
+    flushPolicy.init(this);
+    
+    healthiness = new Healthiness();
+    final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
+    flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
   }
-  
-  boolean deleteTerms(Term... terms) {
-    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
-    synchronized(this) {
-      for (Term term : terms) {
-        pendingDeletes.addTerm(term, numDocs);
-      }
+
+  synchronized void deleteQueries(final Query... queries) throws IOException {
+    deleteQueue.addDelete(queries);
+    flushControl.doOnDelete();
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
     }
-    return doFlush;
   }
 
   // TODO: we could check w/ FreqProxTermsWriter: if the
   // term doesn't exist, don't bother buffering into the
   // per-DWPT map (but still must go into the global map)
-  boolean deleteTerm(Term term, boolean skipWait) {
-    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
-    synchronized(this) {
-      pendingDeletes.addTerm(term, numDocs);
+  synchronized void deleteTerms(final Term... terms) throws IOException {
+    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
+    deleteQueue.addDelete(terms);
+    flushControl.doOnDelete();
+    if (flushControl.doApplyAllDeletes()) {
+      applyAllDeletes(deleteQueue);
     }
-    return doFlush;
   }
 
-  /** If non-null, various details of indexing are printed
-   *  here. */
+  DocumentsWriterDeleteQueue currentDeleteSession() {
+    return deleteQueue;
+  }
+  
+  private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+    if (deleteQueue != null) {
+      synchronized (ticketQueue) {
+        // Freeze and insert the delete flush ticket in the queue
+        ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
+        applyFlushTickets();
+      }
+    }
+    indexWriter.applyAllDeletes();
+    indexWriter.flushCount.incrementAndGet();
+  }
+
   synchronized void setInfoStream(PrintStream infoStream) {
     this.infoStream = infoStream;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].docState.infoStream = infoStream;
+    final Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
+    while (it.hasNext()) {
+      it.next().perThread.docState.infoStream = infoStream;
     }
   }
 
-  /** Get current segment name we are writing. */
-  synchronized String getSegment() {
-    return segment;
+  /** Returns how many docs are currently buffered in RAM. */
+  int getNumDocs() {
+    return numDocsInRAM.get();
   }
 
-  /** Returns how many docs are currently buffered in RAM. */
-  synchronized int getNumDocs() {
-    return numDocs;
+  Collection<String> abortedFiles() {
+    return abortedFiles;
   }
 
-  void message(String message) {
+  // returns boolean for asserts
+  boolean message(String message) {
     if (infoStream != null) {
-      writer.message("DW: " + message);
+      indexWriter.message("DW: " + message);
     }
+    return true;
   }
 
-  synchronized void setAborting() {
-    if (infoStream != null) {
-      message("setAborting");
+  private void ensureOpen() throws AlreadyClosedException {
+    if (closed) {
+      throw new AlreadyClosedException("this IndexWriter is closed");
     }
-    aborting = true;
   }
 
   /** Called if we hit an exception at a bad time (when
@@ -378,816 +220,335 @@ final class DocumentsWriter {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   synchronized void abort() throws IOException {
-    if (infoStream != null) {
-      message("docWriter: abort");
-    }
-
     boolean success = false;
 
-    try {
-
-      // Forcefully remove waiting ThreadStates from line
-      waitQueue.abort();
-
-      // Wait for all other threads to finish with
-      // DocumentsWriter:
-      waitIdle();
+    synchronized (this) {
+      deleteQueue.clear();
+    }
 
+    try {
       if (infoStream != null) {
-        message("docWriter: abort waitIdle done");
+        message("docWriter: abort");
       }
 
-      assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
-
-      waitQueue.waitingBytes = 0;
-
-      pendingDeletes.clear();
+      final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
-      for (DocumentsWriterThreadState threadState : threadStates)
+      while (threadsIterator.hasNext()) {
+        ThreadState perThread = threadsIterator.next();
+        perThread.lock();
         try {
-          threadState.consumer.abort();
-        } catch (Throwable t) {
+          if (perThread.isActive()) { // we might be closed
+            perThread.perThread.abort();
+            perThread.perThread.checkAndResetHasAborted();
+          } else {
+            assert closed;
+          }
+        } finally {
+          perThread.unlock();
         }
-
-      try {
-        consumer.abort();
-      } catch (Throwable t) {
       }
 
-      // Reset all postings data
-      doAfterFlush();
       success = true;
     } finally {
-      aborting = false;
-      notifyAll();
       if (infoStream != null) {
-        message("docWriter: done abort; success=" + success);
+        message("docWriter: done abort; abortedFiles=" + abortedFiles + " success=" + success);
       }
     }
   }
 
-  /** Reset after a flush */
-  private void doAfterFlush() throws IOException {
-    // All ThreadStates should be idle when we are called
-    assert allThreadsIdle();
-    for (DocumentsWriterThreadState threadState : threadStates) {
-      threadState.consumer.doAfterFlush();
-    }
-
-    threadBindings.clear();
-    waitQueue.reset();
-    segment = null;
-    fieldInfos = new FieldInfos(fieldInfos);
-    numDocs = 0;
-    nextDocID = 0;
-    bufferIsFull = false;
-    for(int i=0;i<threadStates.length;i++) {
-      threadStates[i].doAfterFlush();
-    }
+  boolean anyChanges() {
+    return numDocsInRAM.get() != 0 || anyDeletions();
   }
 
-  private synchronized boolean allThreadsIdle() {
-    for(int i=0;i<threadStates.length;i++) {
-      if (!threadStates[i].isIdle) {
-        return false;
-      }
-    }
-    return true;
+  public int getBufferedDeleteTermsSize() {
+    return deleteQueue.getBufferedDeleteTermsSize();
   }
 
-  synchronized boolean anyChanges() {
-    return numDocs != 0 || pendingDeletes.any();
-  }
-
-  // for testing
-  public BufferedDeletes getPendingDeletes() {
-    return pendingDeletes;
-  }
-
-  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
-    // Lock order: DW -> BD
-    final long delGen = bufferedDeletesStream.getNextGen();
-    if (pendingDeletes.any()) {
-      if (segmentInfos.size() > 0 || newSegment != null) {
-        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
-        if (infoStream != null) {
-          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
-        }
-        bufferedDeletesStream.push(packet);
-        if (infoStream != null) {
-          message("flush: delGen=" + packet.gen);
-        }
-        if (newSegment != null) {
-          newSegment.setBufferedDeletesGen(packet.gen);
-        }
-      } else {
-        if (infoStream != null) {
-          message("flush: drop buffered deletes: no segments");
-        }
-        // We can safely discard these deletes: since
-        // there are no segments, the deletions cannot
-        // affect anything.
-      }
-      pendingDeletes.clear();
-    } else if (newSegment != null) {
-      newSegment.setBufferedDeletesGen(delGen);
-    }
+  //for testing
+  public int getNumBufferedDeleteTerms() {
+    return deleteQueue.numGlobalTermDeletes();
   }
 
   public boolean anyDeletions() {
-    return pendingDeletes.any();
+    return deleteQueue.anyChanges();
   }
 
-  /** Flush all pending docs to a new segment */
-  // Lock order: IW -> DW
-  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {
-
-    final long startTime = System.currentTimeMillis();
-
-    // We change writer's segmentInfos:
-    assert Thread.holdsLock(writer);
-
-    waitIdle();
+  void close() {
+    closed = true;
+    flushControl.setClosed();
+  }
 
-    if (numDocs == 0) {
-      // nothing to do!
-      if (infoStream != null) {
-        message("flush: no docs; skipping");
-      }
-      // Lock order: IW -> DW -> BD
-      pushDeletes(null, segmentInfos);
-      return null;
-    }
+  boolean updateDocument(final Document doc, final Analyzer analyzer,
+      final Term delTerm) throws CorruptIndexException, IOException {
+    ensureOpen();
+    boolean maybeMerge = false;
+    final boolean isUpdate = delTerm != null;
+    if (healthiness.anyStalledThreads()) {
 
-    if (aborting) {
+      // Help out flushing any pending DWPTs so we can un-stall:
       if (infoStream != null) {
-        message("flush: skip because aborting is set");
+        message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
       }
-      return null;
-    }
-
-    boolean success = false;
-
-    SegmentInfo newSegment;
-
-    try {
-      assert nextDocID == numDocs;
-      assert waitQueue.numWaiting == 0;
-      assert waitQueue.waitingBytes == 0;
 
-      if (infoStream != null) {
-        message("flush postings as segment " + segment + " numDocs=" + numDocs);
-      }
-      
-      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
-                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
-                                                                 fieldInfos.buildSegmentCodecs(true),
-                                                                 pendingDeletes);
-      // Apply delete-by-docID now (delete-byDocID only
-      // happens when an exception is hit processing that
-      // doc, eg if analyzer has some problem w/ the text):
-      if (pendingDeletes.docIDs.size() > 0) {
-        flushState.deletedDocs = new BitVector(numDocs);
-        for(int delDocID : pendingDeletes.docIDs) {
-          flushState.deletedDocs.set(delDocID);
+      // Try pick up pending threads here if possible
+      DocumentsWriterPerThread flushingDWPT;
+      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+        // Don't push the delete here since the update could fail!
+        maybeMerge = doFlush(flushingDWPT);
+        if (!healthiness.anyStalledThreads()) {
+          break;
         }
-        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
-        pendingDeletes.docIDs.clear();
       }
 
-      newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false, fieldInfos);
-
-      Collection<DocConsumerPerThread> threads = new HashSet<DocConsumerPerThread>();
-      for (DocumentsWriterThreadState threadState : threadStates) {
-        threads.add(threadState.consumer);
+      if (infoStream != null && healthiness.anyStalledThreads()) {
+        message("WARNING DocumentsWriter still has stalled threads; waiting");
       }
 
-      double startMBUsed = bytesUsed()/1024./1024.;
+      healthiness.waitIfStalled(); // block if stalled
 
-      consumer.flush(threads, flushState);
-
-      newSegment.setHasVectors(flushState.hasVectors);
-
-      if (infoStream != null) {
-        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
-        if (flushState.deletedDocs != null) {
-          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
-        }
-        message("flushedFiles=" + newSegment.files());
-        message("flushed codecs=" + newSegment.getSegmentCodecs());
+      if (infoStream != null && healthiness.anyStalledThreads()) {
+        message("WARNING DocumentsWriter done waiting");
       }
+    }
 
-      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
-        final String cfsFileName = IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
-
-        if (infoStream != null) {
-          message("flush: create compound file \"" + cfsFileName + "\"");
-        }
+    final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+        this, doc);
+    final DocumentsWriterPerThread flushingDWPT;
+    
+    try {
 
-        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
-        for(String fileName : newSegment.files()) {
-          cfsWriter.addFile(fileName);
-        }
-        cfsWriter.close();
-        deleter.deleteNewFiles(newSegment.files());
-        newSegment.setUseCompoundFile(true);
+      if (!perThread.isActive()) {
+        ensureOpen();
+        assert false: "perThread is not active but we are still open";
       }
-
-      // Must write deleted docs after the CFS so we don't
-      // slurp the del file into CFS:
-      if (flushState.deletedDocs != null) {
-        final int delCount = flushState.deletedDocs.count();
-        assert delCount > 0;
-        newSegment.setDelCount(delCount);
-        newSegment.advanceDelGen();
-        final String delFileName = newSegment.getDelFileName();
-        if (infoStream != null) {
-          message("flush: write " + delCount + " deletes to " + delFileName);
-        }
-        boolean success2 = false;
-        try {
-          // TODO: in the NRT case it'd be better to hand
-          // this del vector over to the
-          // shortly-to-be-opened SegmentReader and let it
-          // carry the changes; there's no reason to use
-          // filesystem as intermediary here.
-          flushState.deletedDocs.write(directory, delFileName);
-          success2 = true;
-        } finally {
-          if (!success2) {
-            try {
-              directory.deleteFile(delFileName);
-            } catch (Throwable t) {
-              // suppress this so we keep throwing the
-              // original exception
-            }
-          }
+       
+      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      try {
+        dwpt.updateDocument(doc, analyzer, delTerm); 
+        numDocsInRAM.incrementAndGet();
+      } finally {
+        if (dwpt.checkAndResetHasAborted()) {
+          flushControl.doOnAbort(perThread);
         }
       }
-
-      if (infoStream != null) {
-        message("flush: segment=" + newSegment);
-        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
-        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
-        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
-                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
-                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
-                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
-                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
-      }
-
-      success = true;
+      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
     } finally {
-      notifyAll();
-      if (!success) {
-        if (segment != null) {
-          deleter.refresh(segment);
-        }
-        abort();
-      }
+      perThread.unlock();
     }
-
-    doAfterFlush();
-
-    // Lock order: IW -> DW -> BD
-    pushDeletes(newSegment, segmentInfos);
-    if (infoStream != null) {
-      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
-    }
-
-    return newSegment;
-  }
-
-  synchronized void close() {
-    closed = true;
-    notifyAll();
-  }
-
-  /** Returns a free (idle) ThreadState that may be used for
-   * indexing this one document.  This call also pauses if a
-   * flush is pending.  If delTerm is non-null then we
-   * buffer this deleted term after the thread state has
-   * been acquired. */
-  synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {
-
-    final Thread currentThread = Thread.currentThread();
-    assert !Thread.holdsLock(writer);
-
-    // First, find a thread state.  If this thread already
-    // has affinity to a specific ThreadState, use that one
-    // again.
-    DocumentsWriterThreadState state = threadBindings.get(currentThread);
-    if (state == null) {
-
-      // First time this thread has called us since last
-      // flush.  Find the least loaded thread state:
-      DocumentsWriterThreadState minThreadState = null;
-      for(int i=0;i<threadStates.length;i++) {
-        DocumentsWriterThreadState ts = threadStates[i];
-        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
-          minThreadState = ts;
-        }
-      }
-      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
-        state = minThreadState;
-        state.numThreads++;
-      } else {
-        // Just create a new "private" thread state
-        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
-        if (threadStates.length > 0) {
-          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
-        }
-        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
-        threadStates = newArray;
+    
+    if (flushingDWPT != null) {
+      maybeMerge |= doFlush(flushingDWPT);
+    } else {
+      final DocumentsWriterPerThread nextPendingFlush = flushControl.nextPendingFlush();
+      if (nextPendingFlush != null) {
+        maybeMerge |= doFlush(nextPendingFlush);
       }
-      threadBindings.put(currentThread, state);
     }
-
-    // Next, wait until my thread state is idle (in case
-    // it's shared with other threads), and no flush/abort
-    // pending 
-    waitReady(state);
-
-    // Allocate segment name if this is the first doc since
-    // last flush:
-    if (segment == null) {
-      segment = writer.newSegmentName();
-      assert numDocs == 0;
-    }
-
-    state.docState.docID = nextDocID++;
-
-    if (delTerm != null) {
-      pendingDeletes.addTerm(delTerm, state.docState.docID);
-    }
-
-    numDocs++;
-    state.isIdle = false;
-    return state;
-  }
-  
-  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
-    return updateDocument(doc, analyzer, null);
+    return maybeMerge;
   }
-  
-  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
-    throws CorruptIndexException, IOException {
 
-    // Possibly trigger a flush, or wait until any running flush completes:
-    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);
-
-    // This call is synchronized but fast
-    final DocumentsWriterThreadState state = getThreadState(doc, delTerm);
-
-    final DocState docState = state.docState;
-    docState.doc = doc;
-    docState.analyzer = analyzer;
-
-    boolean success = false;
-    try {
-      // This call is not synchronized and does all the
-      // work
-      final DocWriter perDoc;
+  private  boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+    boolean maybeMerge = false;
+    while (flushingDWPT != null) {
+      maybeMerge = true;
+      boolean success = false;
+      FlushTicket ticket = null;
+      
       try {
-        perDoc = state.consumer.processDocument(fieldInfos);
-      } finally {
-        docState.clear();
-      }
-
-      // This call is synchronized but fast
-      finishDocument(state, perDoc);
-
-      success = true;
-    } finally {
-      if (!success) {
-
-        // If this thread state had decided to flush, we
-        // must clear it so another thread can flush
-        if (doFlush) {
-          flushControl.clearFlushPending();
-        }
-
-        if (infoStream != null) {
-          message("exception in updateDocument aborting=" + aborting);
-        }
-
-        synchronized(this) {
-
-          state.isIdle = true;
-          notifyAll();
-            
-          if (aborting) {
-            abort();
-          } else {
-            skipDocWriter.docID = docState.docID;
-            boolean success2 = false;
-            try {
-              waitQueue.add(skipDocWriter);
-              success2 = true;
-            } finally {
-              if (!success2) {
-                abort();
-                return false;
-              }
+        assert currentFullFlushDelQueue == null
+            || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
+            + currentFullFlushDelQueue + "but was: " + flushingDWPT.deleteQueue
+            + " " + flushControl.isFullFlush();
+        /*
+         * Since with DWPT the flush process is concurrent and several DWPT
+         * could flush at the same time we must maintain the order of the
+         * flushes before we can apply the flushed segment and the frozen global
+         * deletes it is buffering. The reason for this is that the global
+         * deletes mark a certain point in time where we took a DWPT out of
+         * rotation and freeze the global deletes.
+         * 
+         * Example: A flush 'A' starts and freezes the global deletes, then
+         * flush 'B' starts and freezes all deletes occurred since 'A' has
+         * started. if 'B' finishes before 'A' we need to wait until 'A' is done
+         * otherwise the deletes frozen by 'B' are not applied to 'A' and we
+         * might miss to deletes documents in 'A'.
+         */
+        try {
+          synchronized (ticketQueue) {
+            // Each flush is assigned a ticket in the order they accquire the ticketQueue lock
+            ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
+            ticketQueue.add(ticket);
+          }
+  
+          // flush concurrently without locking
+          final FlushedSegment newSegment = flushingDWPT.flush();
+          synchronized (ticketQueue) {
+            ticket.segment = newSegment;
+          }
+          // flush was successful once we reached this point - new seg. has been assigned to the ticket!
+          success = true;
+        } finally {
+          if (!success && ticket != null) {
+            synchronized (ticketQueue) {
+              // In the case of a failure make sure we are making progress and
+              // apply all the deletes since the segment flush failed since the flush
+              // ticket could hold global deletes see FlushTicket#canPublish()
+              ticket.isSegmentFlush = false;
             }
-
-            // Immediately mark this document as deleted
-            // since likely it was partially added.  This
-            // keeps indexing as "all or none" (atomic) when
-            // adding a document:
-            deleteDocID(state.docState.docID);
           }
         }
+        /*
+         * Now we are done and try to flush the ticket queue if the head of the
+         * queue has already finished the flush.
+         */
+        applyFlushTickets();
+      } finally {
+        flushControl.doAfterFlush(flushingDWPT);
+        flushingDWPT.checkAndResetHasAborted();
+        indexWriter.flushCount.incrementAndGet();
       }
+     
+      flushingDWPT = flushControl.nextPendingFlush();
     }
-
-    doFlush |= flushControl.flushByRAMUsage("new document");
-
-    return doFlush;
-  }
-
-  public synchronized void waitIdle() {
-    while (!allThreadsIdle()) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    }
+    return maybeMerge;
   }
 
-  synchronized void waitReady(DocumentsWriterThreadState state) {
-    while (!closed && (!state.isIdle || aborting)) {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
+  private void applyFlushTickets() throws IOException {
+    synchronized (ticketQueue) {
+      while (true) {
+        // Keep publishing eligible flushed segments:
+        final FlushTicket head = ticketQueue.peek();
+        if (head != null && head.canPublish()) {
+          ticketQueue.poll();
+          finishFlush(head.segment, head.frozenDeletes);
+        } else {
+          break;
+        }
       }
     }
-
-    if (closed) {
-      throw new AlreadyClosedException("this IndexWriter is closed");
-    }
   }
 
-  /** Does the synchronized work to finish/flush the
-   *  inverted document. */
-  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {
-
-    // Must call this w/o holding synchronized(this) else
-    // we'll hit deadlock:
-    balanceRAM();
-
-    synchronized(this) {
-
-      assert docWriter == null || docWriter.docID == perThread.docState.docID;
-
-      if (aborting) {
-
-        // We are currently aborting, and another thread is
-        // waiting for me to become idle.  We just forcefully
-        // idle this threadState; it will be fully reset by
-        // abort()
-        if (docWriter != null) {
-          try {
-            docWriter.abort();
-          } catch (Throwable t) {
-          }
+  private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+      throws IOException {
+    // Finish the flushed segment and publish it to IndexWriter
+    if (newSegment == null) {
+      assert bufferedDeletes != null;
+      if (bufferedDeletes != null && bufferedDeletes.any()) {
+        indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+        if (infoStream != null) {
+          message("flush: push buffered deletes: " + bufferedDeletes);
         }
-
-        perThread.isIdle = true;
-
-        // wakes up any threads waiting on the wait queue
-        notifyAll();
-
-        return;
       }
-
-      final boolean doPause;
-
-      if (docWriter != null) {
-        doPause = waitQueue.add(docWriter);
-      } else {
-        skipDocWriter.docID = perThread.docState.docID;
-        doPause = waitQueue.add(skipDocWriter);
-      }
-
-      if (doPause) {
-        waitForWaitQueue();
-      }
-
-      perThread.isIdle = true;
-
-      // wakes up any threads waiting on the wait queue
-      notifyAll();
+    } else {
+      publishFlushedSegment(newSegment, bufferedDeletes);  
     }
   }
 
-  synchronized void waitForWaitQueue() {
-    do {
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
-    } while (!waitQueue.doResume());
-  }
-
-  private static class SkipDocWriter extends DocWriter {
-    @Override
-    void finish() {
-    }
-    @Override
-    void abort() {
-    }
-    @Override
-    long sizeInBytes() {
-      return 0;
+  final void subtractFlushedNumDocs(int numFlushed) {
+    int oldValue = numDocsInRAM.get();
+    while (!numDocsInRAM.compareAndSet(oldValue, oldValue - numFlushed)) {
+      oldValue = numDocsInRAM.get();
     }
   }
-  final SkipDocWriter skipDocWriter = new SkipDocWriter();
-
-  NumberFormat nf = NumberFormat.getInstance();
-
-  /* Initial chunks size of the shared byte[] blocks used to
-     store postings data */
-  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
-
-  /* if you increase this, you must fix field cache impl for
-   * getTerms/getTermsIndex requires <= 32768. */
-  final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
-
-  /* Initial chunks size of the shared int[] blocks used to
-     store postings data */
-  final static int INT_BLOCK_SHIFT = 13;
-  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
-  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;
-
-  private List<int[]> freeIntBlocks = new ArrayList<int[]>();
-
-  /* Allocate another int[] from the shared pool */
-  synchronized int[] getIntBlock() {
-    final int size = freeIntBlocks.size();
-    final int[] b;
-    if (0 == size) {
-      b = new int[INT_BLOCK_SIZE];
-      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
-    } else {
-      b = freeIntBlocks.remove(size-1);
+  
+  /**
+   * Publishes the flushed segment, segment private deletes (if any) and its
+   * associated global delete (if present) to IndexWriter.  The actual
+   * publishing operation is synced on IW -> BDS so that the {@link SegmentInfo}'s
+   * delete generation is always GlobalPacket_deleteGeneration + 1
+   */
+  private void publishFlushedSegment(FlushedSegment newSegment, FrozenBufferedDeletes globalPacket)
+      throws IOException {
+    assert newSegment != null;
+    final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
+    final BufferedDeletes deletes = newSegment.segmentDeletes;
+    FrozenBufferedDeletes packet = null;
+    if (deletes != null && deletes.any()) {
+      // Segment private delete
+      packet = new FrozenBufferedDeletes(deletes, true);
+      if (infoStream != null) {
+        message("flush: push buffered seg private deletes: " + packet);
+      }
     }
-    return b;
-  }
-
-  long bytesUsed() {
-    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
-  }
 
-  /* Return int[]s to the pool */
-  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
-    for(int i=start;i<end;i++) {
-      freeIntBlocks.add(blocks[i]);
-      blocks[i] = null;
-    }
+    // now publish!
+    indexWriter.publishFlushedSegment(segInfo, packet, globalPacket);
   }
-
-  final RecyclingByteBlockAllocator byteBlockAllocator = new RecyclingByteBlockAllocator(BYTE_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
-  final static int PER_DOC_BLOCK_SIZE = 1024;
-
-  final RecyclingByteBlockAllocator perDocAllocator = new RecyclingByteBlockAllocator(PER_DOC_BLOCK_SIZE, Integer.MAX_VALUE, bytesUsed);
-
-  String toMB(long v) {
-    return nf.format(v/1024./1024.);
+  
+  // for asserts
+  private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+  // for asserts
+  private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
+    currentFullFlushDelQueue = session;
+    return true;
   }
-
-  /* We have three pools of RAM: Postings, byte blocks
-   * (holds freq/prox posting data) and per-doc buffers
-   * (stored fields/term vectors).  Different docs require
-   * varying amount of storage from these classes.  For
-   * example, docs with many unique single-occurrence short
-   * terms will use up the Postings RAM and hardly any of
-   * the other two.  Whereas docs with very large terms will
-   * use alot of byte blocks RAM.  This method just frees
-   * allocations from the pools once we are over-budget,
-   * which balances the pools to match the current docs. */
-  void balanceRAM() {
-
-    final boolean doBalance;
-    final long deletesRAMUsed;
-
-    deletesRAMUsed = bufferedDeletesStream.bytesUsed();
-
-    final long ramBufferSize;
-    final double mb = config.getRAMBufferSizeMB();
-    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
-    } else {
-      ramBufferSize = (long) (mb*1024*1024);
+  
+  /*
+   * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a
+   * two stage operation; the caller must ensure (in try/finally) that finishFlush
+   * is called after this method, to release the flush lock in DWFlushControl
+   */
+  final boolean flushAllThreads()
+    throws IOException {
+    final DocumentsWriterDeleteQueue flushingDeleteQueue;
+
+    synchronized (this) {
+      flushingDeleteQueue = deleteQueue;
+      /* Cutover to a new delete queue.  This must be synced on the flush control
+       * otherwise a new DWPT could sneak into the loop with an already flushing
+       * delete queue */
+      flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+      assert setFlushingDeleteQueue(flushingDeleteQueue);
     }
-
-    synchronized(this) {
-      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
-        return;
-      }
+    assert currentFullFlushDelQueue != null;
+    assert currentFullFlushDelQueue != deleteQueue;
     
-      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
-    }
-
-    if (doBalance) {
-
-      if (infoStream != null) {
-        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
-                " vs trigger=" + toMB(ramBufferSize) +
-                " deletesMB=" + toMB(deletesRAMUsed) +
-                " byteBlockFree=" + toMB(byteBlockAllocator.bytesUsed()) +
-                " perDocFree=" + toMB(perDocAllocator.bytesUsed()));
-      }
-
-      final long startBytesUsed = bytesUsed() + deletesRAMUsed;
-
-      int iter = 0;
-
-      // We free equally from each pool in 32 KB
-      // chunks until we are below our threshold
-      // (freeLevel)
-
-      boolean any = true;
-
-      final long freeLevel = (long) (0.95 * ramBufferSize);
-
-      while(bytesUsed()+deletesRAMUsed > freeLevel) {
-      
-        synchronized(this) {
-          if (0 == perDocAllocator.numBufferedBlocks() &&
-              0 == byteBlockAllocator.numBufferedBlocks() &&
-              0 == freeIntBlocks.size() && !any) {
-            // Nothing else to free -- must flush now.
-            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
-            if (infoStream != null) {
-              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
-                message("    nothing to free; set bufferIsFull");
-              } else {
-                message("    nothing to free");
-              }
-            }
-            break;
-          }
-
-          if ((0 == iter % 4) && byteBlockAllocator.numBufferedBlocks() > 0) {
-            byteBlockAllocator.freeBlocks(1);
-          }
-          if ((1 == iter % 4) && freeIntBlocks.size() > 0) {
-            freeIntBlocks.remove(freeIntBlocks.size()-1);
-            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
-          }
-          if ((2 == iter % 4) && perDocAllocator.numBufferedBlocks() > 0) {
-            perDocAllocator.freeBlocks(32); // Remove upwards of 32 blocks (each block is 1K)
-          }
-        }
-
-        if ((3 == iter % 4) && any) {
-          // Ask consumer to free any recycled state
-          any = consumer.freeRAM();
+    boolean anythingFlushed = false;
+    try {
+      DocumentsWriterPerThread flushingDWPT;
+      // Help out with flushing:
+      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+        anythingFlushed |= doFlush(flushingDWPT);
+      }
+      // If a concurrent flush is still in flight wait for it
+      while (flushControl.anyFlushing()) {
+        flushControl.waitForFlush();  
+      }
+      if (!anythingFlushed) { // apply deletes if we did not flush any document
+        synchronized (ticketQueue) {
+          ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
         }
-
-        iter++;
-      }
-
-      if (infoStream != null) {
-        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
+        applyFlushTickets();
       }
+    } finally {
+      assert flushingDeleteQueue == currentFullFlushDelQueue;
     }
+    return anythingFlushed;
   }
-
-  final WaitQueue waitQueue = new WaitQueue();
-
-  private class WaitQueue {
-    DocWriter[] waiting;
-    int nextWriteDocID;
-    int nextWriteLoc;
-    int numWaiting;
-    long waitingBytes;
-
-    public WaitQueue() {
-      waiting = new DocWriter[10];
-    }
-
-    synchronized void reset() {
-      // NOTE: nextWriteLoc doesn't need to be reset
-      assert numWaiting == 0;
-      assert waitingBytes == 0;
-      nextWriteDocID = 0;
-    }
-
-    synchronized boolean doResume() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueueResumeBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueueResumeBytes = 2*1024*1024;
-      } else {
-        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
-      }
-      return waitingBytes <= waitQueueResumeBytes;
-    }
-
-    synchronized boolean doPause() {
-      final double mb = config.getRAMBufferSizeMB();
-      final long waitQueuePauseBytes;
-      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        waitQueuePauseBytes = 4*1024*1024;
-      } else {
-        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
-      }
-      return waitingBytes > waitQueuePauseBytes;
-    }
-
-    synchronized void abort() {
-      int count = 0;
-      for(int i=0;i<waiting.length;i++) {
-        final DocWriter doc = waiting[i];
-        if (doc != null) {
-          doc.abort();
-          waiting[i] = null;
-          count++;
-        }
-      }
-      waitingBytes = 0;
-      assert count == numWaiting;
-      numWaiting = 0;
+  
+  final void finishFullFlush(boolean success) {
+    assert setFlushingDeleteQueue(null);
+    if (success) {
+      // Release the flush lock
+      flushControl.finishFullFlush();
+    } else {
+      flushControl.abortFullFlushes();
     }
+  }
 
-    private void writeDocument(DocWriter doc) throws IOException {
-      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
-      boolean success = false;
-      try {
-        doc.finish();
-        nextWriteDocID++;
-        nextWriteLoc++;
-        assert nextWriteLoc <= waiting.length;
-        if (nextWriteLoc == waiting.length) {
-          nextWriteLoc = 0;
-        }
-        success = true;
-      } finally {
-        if (!success) {
-          setAborting();
-        }
-      }
+  static final class FlushTicket {
+    final FrozenBufferedDeletes frozenDeletes;
+    /* access to non-final members must be synchronized on DW#ticketQueue */
+    FlushedSegment segment;
+    boolean isSegmentFlush;
+    
+    FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
+      this.frozenDeletes = frozenDeletes;
+      this.isSegmentFlush = isSegmentFlush;
     }
-
-    synchronized public boolean add(DocWriter doc) throws IOException {
-
-      assert doc.docID >= nextWriteDocID;
-
-      if (doc.docID == nextWriteDocID) {
-        writeDocument(doc);
-        while(true) {
-          doc = waiting[nextWriteLoc];
-          if (doc != null) {
-            numWaiting--;
-            waiting[nextWriteLoc] = null;
-            waitingBytes -= doc.sizeInBytes();
-            writeDocument(doc);
-          } else {
-            break;
-          }
-        }
-      } else {
-
-        // I finished before documents that were added
-        // before me.  This can easily happen when I am a
-        // small doc and the docs before me were large, or,
-        // just due to luck in the thread scheduling.  Just
-        // add myself to the queue and when that large doc
-        // finishes, it will flush me:
-        int gap = doc.docID - nextWriteDocID;
-        if (gap >= waiting.length) {
-          // Grow queue
-          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
-          assert nextWriteLoc >= 0;
-          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
-          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
-          nextWriteLoc = 0;
-          waiting = newArray;
-          gap = doc.docID - nextWriteDocID;
-        }
-
-        int loc = nextWriteLoc + gap;
-        if (loc >= waiting.length) {
-          loc -= waiting.length;
-        }
-
-        // We should only wrap one time
-        assert loc < waiting.length;
-
-        // Nobody should be in my spot!
-        assert waiting[loc] == null;
-        waiting[loc] = doc;
-        numWaiting++;
-        waitingBytes += doc.sizeInBytes();
-      }
-      
-      return doPause();
+    
+    boolean canPublish() {
+      return (!isSegmentFlush || segment != null);  
     }
   }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FieldsWriter.java Sun May  1 22:38:33 2011
@@ -2,13 +2,13 @@ package org.apache.lucene.index;
 
 /**
  * Copyright 2004 The Apache Software Foundation
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
  * the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -22,15 +22,14 @@ import java.util.List;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Fieldable;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMOutputStream;
-import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.util.IOUtils;
 
 final class FieldsWriter {
   static final byte FIELD_IS_TOKENIZED = 0x1;
   static final byte FIELD_IS_BINARY = 0x2;
-  
+
   // Lucene 3.0: Removal of compressed fields
   static final int FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS = 2;
 
@@ -38,7 +37,7 @@ final class FieldsWriter {
   // than the current one, and always change this if you
   // switch to a new format!
   static final int FORMAT_CURRENT = FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS;
-  
+
   // when removing support for old versions, leave the last supported version here
   static final int FORMAT_MINIMUM = FORMAT_LUCENE_3_0_NO_COMPRESSED_FIELDS;
 
@@ -83,10 +82,9 @@ final class FieldsWriter {
   // and adds a new entry for this document into the index
   // stream.  This assumes the buffer was already written
   // in the correct fields format.
-  void flushDocument(int numStoredFields, RAMOutputStream buffer) throws IOException {
+  void startDocument(int numStoredFields) throws IOException {
     indexStream.writeLong(fieldsStream.getFilePointer());
     fieldsStream.writeVInt(numStoredFields);
-    buffer.writeTo(fieldsStream);
   }
 
   void skipDocument() throws IOException {
@@ -121,8 +119,8 @@ final class FieldsWriter {
     }
   }
 
-  final void writeField(FieldInfo fi, Fieldable field) throws IOException {
-    fieldsStream.writeVInt(fi.number);
+  final void writeField(int fieldNumber, Fieldable field) throws IOException {
+    fieldsStream.writeVInt(fieldNumber);
     byte bits = 0;
     if (field.isTokenized())
       bits |= FieldsWriter.FIELD_IS_TOKENIZED;
@@ -175,10 +173,9 @@ final class FieldsWriter {
     fieldsStream.writeVInt(storedCount);
 
 
-
     for (Fieldable field : fields) {
       if (field.isStored())
-        writeField(fieldInfos.fieldInfo(field.name()), field);
+        writeField(fieldInfos.fieldNumber(field.name()), field);
     }
   }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Sun May  1 22:38:33 2011
@@ -19,55 +19,35 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.lucene.index.codecs.FieldsConsumer;
-import org.apache.lucene.index.codecs.PostingsConsumer;
-import org.apache.lucene.index.codecs.TermStats;
-import org.apache.lucene.index.codecs.TermsConsumer;
-import org.apache.lucene.util.BitVector;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CollectionUtil;
 
 final class FreqProxTermsWriter extends TermsHashConsumer {
 
   @Override
-  public TermsHashConsumerPerThread addThread(TermsHashPerThread perThread) {
-    return new FreqProxTermsWriterPerThread(perThread);
-  }
-
-  @Override
   void abort() {}
 
-  private int flushedDocCount;
-
   // TODO: would be nice to factor out more of this, eg the
   // FreqProxFieldMergeState, and code to visit all Fields
   // under the same FieldInfo together, up into TermsHash*.
   // Other writers would presumably share alot of this...
 
   @Override
-  public void flush(Map<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> threadsAndFields, final SegmentWriteState state) throws IOException {
+  public void flush(Map<FieldInfo, TermsHashConsumerPerField> fieldsToFlush, final SegmentWriteState state) throws IOException {
 
     // Gather all FieldData's that have postings, across all
     // ThreadStates
     List<FreqProxTermsWriterPerField> allFields = new ArrayList<FreqProxTermsWriterPerField>();
-    
-    flushedDocCount = state.numDocs;
-
-    for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
-
-      Collection<TermsHashConsumerPerField> fields = entry.getValue();
-
 
-      for (final TermsHashConsumerPerField i : fields) {
-        final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) i;
-        if (perField.termsHashPerField.bytesHash.size() > 0)
+    for (TermsHashConsumerPerField f : fieldsToFlush.values()) {
+        final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) f;
+        if (perField.termsHashPerField.bytesHash.size() > 0) {
           allFields.add(perField);
-      }
+        }
     }
 
     final int numAllFields = allFields.size();
@@ -77,6 +57,8 @@ final class FreqProxTermsWriter extends 
 
     final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
 
+    TermsHash termsHash = null;
+
     /*
     Current writer chain:
       FieldsConsumer
@@ -89,257 +71,48 @@ final class FreqProxTermsWriter extends 
                     -> IMPL: FormatPostingsPositionsWriter
     */
 
-    int start = 0;
-    while(start < numAllFields) {
-      final FieldInfo fieldInfo = allFields.get(start).fieldInfo;
-      final String fieldName = fieldInfo.name;
-
-      int end = start+1;
-      while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName))
-        end++;
-      
-      FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start];
-      for(int i=start;i<end;i++) {
-        fields[i-start] = allFields.get(i);
-
-        // Aggregate the storePayload as seen by the same
-        // field across multiple threads
-        if (!fieldInfo.omitTermFreqAndPositions) {
-          fieldInfo.storePayloads |= fields[i-start].hasPayloads;
-        }
+    for (int fieldNumber = 0; fieldNumber < numAllFields; fieldNumber++) {
+      final FieldInfo fieldInfo = allFields.get(fieldNumber).fieldInfo;
+
+      final FreqProxTermsWriterPerField fieldWriter = allFields.get(fieldNumber);
+
+      // Aggregate the storePayload as seen by the same
+      // field across multiple threads
+      if (!fieldInfo.omitTermFreqAndPositions) {
+        fieldInfo.storePayloads |= fieldWriter.hasPayloads;
       }
 
       // If this field has postings then add them to the
       // segment
-      appendPostings(fieldName, state, fields, consumer);
-
-      for(int i=0;i<fields.length;i++) {
-        TermsHashPerField perField = fields[i].termsHashPerField;
-        int numPostings = perField.bytesHash.size();
-        perField.reset();
-        perField.shrinkHash(numPostings);
-        fields[i].reset();
-      }
+      fieldWriter.flush(fieldInfo.name, consumer, state);
 
-      start = end;
+      TermsHashPerField perField = fieldWriter.termsHashPerField;
+      assert termsHash == null || termsHash == perField.termsHash;
+      termsHash = perField.termsHash;
+      int numPostings = perField.bytesHash.size();
+      perField.reset();
+      perField.shrinkHash(numPostings);
+      fieldWriter.reset();
     }
 
-    for (Map.Entry<TermsHashConsumerPerThread,Collection<TermsHashConsumerPerField>> entry : threadsAndFields.entrySet()) {
-      FreqProxTermsWriterPerThread perThread = (FreqProxTermsWriterPerThread) entry.getKey();
-      perThread.termsHashPerThread.reset(true);
+    if (termsHash != null) {
+      termsHash.reset();
     }
     consumer.close();
   }
 
   BytesRef payload;
 
-  /* Walk through all unique text tokens (Posting
-   * instances) found in this field and serialize them
-   * into a single RAM segment. */
-  void appendPostings(String fieldName, SegmentWriteState state,
-                      FreqProxTermsWriterPerField[] fields,
-                      FieldsConsumer consumer)
-    throws CorruptIndexException, IOException {
-
-    int numFields = fields.length;
-
-    final BytesRef text = new BytesRef();
-
-    final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields];
-
-    final TermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo);
-    final Comparator<BytesRef> termComp = termsConsumer.getComparator();
-
-    for(int i=0;i<numFields;i++) {
-      FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i], termComp);
-
-      assert fms.field.fieldInfo == fields[0].fieldInfo;
-
-      // Should always be true
-      boolean result = fms.nextTerm();
-      assert result;
-    }
-
-    final Term protoTerm = new Term(fieldName);
-
-    FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
-
-    final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;
-    //System.out.println("flush terms field=" + fields[0].fieldInfo.name);
-
-    final Map<Term,Integer> segDeletes;
-    if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
-      segDeletes = state.segDeletes.terms;
-    } else {
-      segDeletes = null;
-    }
-
-    // TODO: really TermsHashPerField should take over most
-    // of this loop, including merge sort of terms from
-    // multiple threads and interacting with the
-    // TermsConsumer, only calling out to us (passing us the
-    // DocsConsumer) to handle delivery of docs/positions
-    long sumTotalTermFreq = 0;
-    while(numFields > 0) {
-
-      // Get the next term to merge
-      termStates[0] = mergeStates[0];
-      int numToMerge = 1;
-
-      // TODO: pqueue
-      for(int i=1;i<numFields;i++) {
-        final int cmp = termComp.compare(mergeStates[i].text, termStates[0].text);
-        if (cmp < 0) {
-          termStates[0] = mergeStates[i];
-          numToMerge = 1;
-        } else if (cmp == 0) {
-          termStates[numToMerge++] = mergeStates[i];
-        }
-      }
-
-      // Need shallow copy here because termStates[0].text
-      // changes by the time we call finishTerm
-      text.bytes = termStates[0].text.bytes;
-      text.offset = termStates[0].text.offset;
-      text.length = termStates[0].text.length;  
-
-      //System.out.println("  term=" + text.toUnicodeString());
-      //System.out.println("  term=" + text.toString());
-
-      final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
-
-      final int delDocLimit;
-      if (segDeletes != null) {
-        final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
-        if (docIDUpto != null) {
-          delDocLimit = docIDUpto;
-        } else {
-          delDocLimit = 0;
-        }
-      } else {
-        delDocLimit = 0;
-      }
-
-      // Now termStates has numToMerge FieldMergeStates
-      // which all share the same term.  Now we must
-      // interleave the docID streams.
-      int numDocs = 0;
-      long totTF = 0;
-      while(numToMerge > 0) {
-        
-        FreqProxFieldMergeState minState = termStates[0];
-        for(int i=1;i<numToMerge;i++) {
-          if (termStates[i].docID < minState.docID) {
-            minState = termStates[i];
-          }
-        }
-
-        final int termDocFreq = minState.termFreq;
-        numDocs++;
-
-        assert minState.docID < flushedDocCount: "doc=" + minState.docID + " maxDoc=" + flushedDocCount;
-
-        // NOTE: we could check here if the docID was
-        // deleted, and skip it.  However, this is somewhat
-        // dangerous because it can yield non-deterministic
-        // behavior since we may see the docID before we see
-        // the term that caused it to be deleted.  This
-        // would mean some (but not all) of its postings may
-        // make it into the index, which'd alter the docFreq
-        // for those terms.  We could fix this by doing two
-        // passes, ie first sweep marks all del docs, and
-        // 2nd sweep does the real flush, but I suspect
-        // that'd add too much time to flush.
-
-        postingsConsumer.startDoc(minState.docID, termDocFreq);
-        if (minState.docID < delDocLimit) {
-          // Mark it deleted.  TODO: we could also skip
-          // writing its postings; this would be
-          // deterministic (just for this Term's docs).
-          if (state.deletedDocs == null) {
-            state.deletedDocs = new BitVector(state.numDocs);
-          }
-          state.deletedDocs.set(minState.docID);
-        }
-
-        final ByteSliceReader prox = minState.prox;
-
-        // Carefully copy over the prox + payload info,
-        // changing the format to match Lucene's segment
-        // format.
-        if (!currentFieldOmitTermFreqAndPositions) {
-          // omitTermFreqAndPositions == false so we do write positions &
-          // payload          
-          int position = 0;
-          totTF += termDocFreq;
-          for(int j=0;j<termDocFreq;j++) {
-            final int code = prox.readVInt();
-            position += code >> 1;
-            //System.out.println("    pos=" + position);
-
-            final int payloadLength;
-            final BytesRef thisPayload;
-
-            if ((code & 1) != 0) {
-              // This position has a payload
-              payloadLength = prox.readVInt();  
-              
-              if (payload == null) {
-                payload = new BytesRef();
-                payload.bytes = new byte[payloadLength];
-              } else if (payload.bytes.length < payloadLength) {
-                payload.grow(payloadLength);
-              }
-
-              prox.readBytes(payload.bytes, 0, payloadLength);
-              payload.length = payloadLength;
-              thisPayload = payload;
-
-            } else {
-              payloadLength = 0;
-              thisPayload = null;
-            }
-
-            postingsConsumer.addPosition(position, thisPayload);
-          } //End for
-
-          postingsConsumer.finishDoc();
-        }
-
-        if (!minState.nextDoc()) {
-
-          // Remove from termStates
-          int upto = 0;
-          // TODO: inefficient O(N) where N = number of
-          // threads that had seen this term:
-          for(int i=0;i<numToMerge;i++) {
-            if (termStates[i] != minState) {
-              termStates[upto++] = termStates[i];
-            }
-          }
-          numToMerge--;
-          assert upto == numToMerge;
-
-          // Advance this state to the next term
-
-          if (!minState.nextTerm()) {
-            // OK, no more terms, so remove from mergeStates
-            // as well
-            upto = 0;
-            for(int i=0;i<numFields;i++)
-              if (mergeStates[i] != minState)
-                mergeStates[upto++] = mergeStates[i];
-            numFields--;
-            assert upto == numFields;
-          }
-        }
-      }
+  @Override
+  public TermsHashConsumerPerField addField(TermsHashPerField termsHashPerField, FieldInfo fieldInfo) {
+    return new FreqProxTermsWriterPerField(termsHashPerField, this, fieldInfo);
+  }
 
-      assert numDocs > 0;
-      termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
-      sumTotalTermFreq += totTF;
-    }
+  @Override
+  void finishDocument(TermsHash termsHash) throws IOException {
+  }
 
-    termsConsumer.finish(sumTotalTermFreq);
+  @Override
+  void startDocument() throws IOException {
   }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriterPerField.java Sun May  1 22:38:33 2011
@@ -18,9 +18,17 @@ package org.apache.lucene.index;
  */
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
 
 import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
 import org.apache.lucene.document.Fieldable;
+import org.apache.lucene.index.codecs.FieldsConsumer;
+import org.apache.lucene.index.codecs.PostingsConsumer;
+import org.apache.lucene.index.codecs.TermStats;
+import org.apache.lucene.index.codecs.TermsConsumer;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.RamUsageEstimator;
 
 // TODO: break into separate freq and prox writers as
@@ -28,17 +36,17 @@ import org.apache.lucene.util.RamUsageEs
 // be configured as any number of files 1..N
 final class FreqProxTermsWriterPerField extends TermsHashConsumerPerField implements Comparable<FreqProxTermsWriterPerField> {
 
-  final FreqProxTermsWriterPerThread perThread;
+  final FreqProxTermsWriter parent;
   final TermsHashPerField termsHashPerField;
   final FieldInfo fieldInfo;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final FieldInvertState fieldState;
   boolean omitTermFreqAndPositions;
   PayloadAttribute payloadAttribute;
 
-  public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriterPerThread perThread, FieldInfo fieldInfo) {
+  public FreqProxTermsWriterPerField(TermsHashPerField termsHashPerField, FreqProxTermsWriter parent, FieldInfo fieldInfo) {
     this.termsHashPerField = termsHashPerField;
-    this.perThread = perThread;
+    this.parent = parent;
     this.fieldInfo = fieldInfo;
     docState = termsHashPerField.docState;
     fieldState = termsHashPerField.fieldState;
@@ -78,8 +86,8 @@ final class FreqProxTermsWriterPerField 
       if (fields[i].isIndexed())
         return true;
     return false;
-  }     
-  
+  }
+
   @Override
   void start(Fieldable f) {
     if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) {
@@ -96,18 +104,18 @@ final class FreqProxTermsWriterPerField 
     } else {
       payload = payloadAttribute.getPayload();
     }
-    
+
     if (payload != null && payload.length > 0) {
       termsHashPerField.writeVInt(1, (proxCode<<1)|1);
       termsHashPerField.writeVInt(1, payload.length);
       termsHashPerField.writeBytes(1, payload.data, payload.offset, payload.length);
-      hasPayloads = true;      
+      hasPayloads = true;
     } else
       termsHashPerField.writeVInt(1, proxCode<<1);
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
     postings.lastPositions[termID] = fieldState.position;
-    
+
   }
 
   @Override
@@ -115,7 +123,7 @@ final class FreqProxTermsWriterPerField 
     // First time we're seeing this term since the last
     // flush
     assert docState.testPoint("FreqProxTermsWriterPerField.newTerm start");
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
     postings.lastDocIDs[termID] = docState.docID;
     if (omitTermFreqAndPositions) {
@@ -132,9 +140,9 @@ final class FreqProxTermsWriterPerField 
   void addTerm(final int termID) {
 
     assert docState.testPoint("FreqProxTermsWriterPerField.addTerm start");
-    
+
     FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
-    
+
     assert omitTermFreqAndPositions || postings.docFreqs[termID] > 0;
 
     if (omitTermFreqAndPositions) {
@@ -169,7 +177,7 @@ final class FreqProxTermsWriterPerField 
       }
     }
   }
-  
+
   @Override
   ParallelPostingsArray createPostingsArray(int size) {
     return new FreqProxPostingsArray(size);
@@ -212,7 +220,180 @@ final class FreqProxTermsWriterPerField 
       return ParallelPostingsArray.BYTES_PER_POSTING + 4 * RamUsageEstimator.NUM_BYTES_INT;
     }
   }
-  
+
   public void abort() {}
+
+  BytesRef payload;
+
+  /* Walk through all unique text tokens (Posting
+   * instances) found in this field and serialize them
+   * into a single RAM segment. */
+  void flush(String fieldName, FieldsConsumer consumer,  final SegmentWriteState state)
+    throws CorruptIndexException, IOException {
+
+    final TermsConsumer termsConsumer = consumer.addField(fieldInfo);
+    final Comparator<BytesRef> termComp = termsConsumer.getComparator();
+
+    final Term protoTerm = new Term(fieldName);
+
+    final boolean currentFieldOmitTermFreqAndPositions = fieldInfo.omitTermFreqAndPositions;
+
+    final Map<Term,Integer> segDeletes;
+    if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
+      segDeletes = state.segDeletes.terms;
+    } else {
+      segDeletes = null;
+    }
+
+    final int[] termIDs = termsHashPerField.sortPostings(termComp);
+    final int numTerms = termsHashPerField.bytesHash.size();
+    final BytesRef text = new BytesRef();
+    final FreqProxPostingsArray postings = (FreqProxPostingsArray) termsHashPerField.postingsArray;
+    final ByteSliceReader freq = new ByteSliceReader();
+    final ByteSliceReader prox = new ByteSliceReader();
+
+    long sumTotalTermFreq = 0;
+    for (int i = 0; i < numTerms; i++) {
+      final int termID = termIDs[i];
+      // Get BytesRef
+      final int textStart = postings.textStarts[termID];
+      termsHashPerField.bytePool.setBytesRef(text, textStart);
+
+      termsHashPerField.initReader(freq, termID, 0);
+      if (!fieldInfo.omitTermFreqAndPositions) {
+        termsHashPerField.initReader(prox, termID, 1);
+      }
+
+      // TODO: really TermsHashPerField should take over most
+      // of this loop, including merge sort of terms from
+      // multiple threads and interacting with the
+      // TermsConsumer, only calling out to us (passing us the
+      // DocsConsumer) to handle delivery of docs/positions
+
+      final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
+
+      final int delDocLimit;
+      if (segDeletes != null) {
+        final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
+        if (docIDUpto != null) {
+          delDocLimit = docIDUpto;
+        } else {
+          delDocLimit = 0;
+        }
+      } else {
+        delDocLimit = 0;
+      }
+
+      // Now termStates has numToMerge FieldMergeStates
+      // which all share the same term.  Now we must
+      // interleave the docID streams.
+      int numDocs = 0;
+      long totTF = 0;
+      int docID = 0;
+      int termFreq = 0;
+
+      while(true) {
+        if (freq.eof()) {
+          if (postings.lastDocCodes[termID] != -1) {
+            // Return last doc
+            docID = postings.lastDocIDs[termID];
+            if (!omitTermFreqAndPositions) {
+              termFreq = postings.docFreqs[termID];
+            }
+            postings.lastDocCodes[termID] = -1;
+          } else {
+            // EOF
+            break;
+          }
+        } else {
+          final int code = freq.readVInt();
+          if (omitTermFreqAndPositions) {
+            docID += code;
+          } else {
+            docID += code >>> 1;
+            if ((code & 1) != 0) {
+              termFreq = 1;
+            } else {
+              termFreq = freq.readVInt();
+            }
+          }
+
+          assert docID != postings.lastDocIDs[termID];
+        }
+
+        numDocs++;
+        assert docID < state.numDocs: "doc=" + docID + " maxDoc=" + state.numDocs;
+        final int termDocFreq = termFreq;
+
+        // NOTE: we could check here if the docID was
+        // deleted, and skip it.  However, this is somewhat
+        // dangerous because it can yield non-deterministic
+        // behavior since we may see the docID before we see
+        // the term that caused it to be deleted.  This
+        // would mean some (but not all) of its postings may
+        // make it into the index, which'd alter the docFreq
+        // for those terms.  We could fix this by doing two
+        // passes, ie first sweep marks all del docs, and
+        // 2nd sweep does the real flush, but I suspect
+        // that'd add too much time to flush.
+        postingsConsumer.startDoc(docID, termDocFreq);
+        if (docID < delDocLimit) {
+          // Mark it deleted.  TODO: we could also skip
+          // writing its postings; this would be
+          // deterministic (just for this Term's docs).
+          if (state.deletedDocs == null) {
+            state.deletedDocs = new BitVector(state.numDocs);
+          }
+          state.deletedDocs.set(docID);
+        }
+
+        // Carefully copy over the prox + payload info,
+        // changing the format to match Lucene's segment
+        // format.
+        if (!currentFieldOmitTermFreqAndPositions) {
+          // omitTermFreqAndPositions == false so we do write positions &
+          // payload
+          int position = 0;
+          totTF += termDocFreq;
+          for(int j=0;j<termDocFreq;j++) {
+            final int code = prox.readVInt();
+            position += code >> 1;
+
+            final int payloadLength;
+            final BytesRef thisPayload;
+
+            if ((code & 1) != 0) {
+              // This position has a payload
+              payloadLength = prox.readVInt();
+
+              if (payload == null) {
+                payload = new BytesRef();
+                payload.bytes = new byte[payloadLength];
+              } else if (payload.bytes.length < payloadLength) {
+                payload.grow(payloadLength);
+              }
+
+              prox.readBytes(payload.bytes, 0, payloadLength);
+              payload.length = payloadLength;
+              thisPayload = payload;
+
+            } else {
+              payloadLength = 0;
+              thisPayload = null;
+            }
+
+            postingsConsumer.addPosition(position, thisPayload);
+          }
+
+          postingsConsumer.finishDoc();
+        }
+      }
+      termsConsumer.finishTerm(text, new TermStats(numDocs, totTF));
+      sumTotalTermFreq += totTF;
+    }
+
+    termsConsumer.finish(sumTotalTermFreq);
+  }
+
 }
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Sun May  1 22:38:33 2011
@@ -21,7 +21,13 @@ import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.lucene.index.codecs.CodecProvider;
 import org.apache.lucene.store.Directory;
@@ -49,12 +55,12 @@ import org.apache.lucene.util.Collection
  * (IndexDeletionPolicy) is consulted on creation (onInit)
  * and once per commit (onCommit), to decide when a commit
  * should be removed.
- * 
+ *
  * It is the business of the IndexDeletionPolicy to choose
  * when to delete commit points.  The actual mechanics of
  * file deletion, retrying, etc, derived from the deletion
  * of commit points is the business of the IndexFileDeleter.
- * 
+ *
  * The current default deletion policy is {@link
  * KeepOnlyLastCommitDeletionPolicy}, which removes all
  * prior commits when a new commit has completed.  This
@@ -72,7 +78,7 @@ final class IndexFileDeleter {
    * so we will retry them again later: */
   private List<String> deletable;
 
-  /* Reference count for all files in the index.  
+  /* Reference count for all files in the index.
    * Counts how many existing commits reference a file.
    **/
   private Map<String, RefCount> refCounts = new HashMap<String, RefCount>();
@@ -88,7 +94,7 @@ final class IndexFileDeleter {
    * non-commit checkpoint: */
   private List<Collection<String>> lastFiles = new ArrayList<Collection<String>>();
 
-  /* Commits that the IndexDeletionPolicy have decided to delete: */ 
+  /* Commits that the IndexDeletionPolicy have decided to delete: */
   private List<CommitPoint> commitsToDelete = new ArrayList<CommitPoint>();
 
   private PrintStream infoStream;
@@ -108,7 +114,7 @@ final class IndexFileDeleter {
       message("setInfoStream deletionPolicy=" + policy);
     }
   }
-  
+
   private void message(String message) {
     infoStream.println("IFD [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
   }
@@ -139,12 +145,12 @@ final class IndexFileDeleter {
     // counts:
     long currentGen = segmentInfos.getGeneration();
     indexFilenameFilter = new IndexFileNameFilter(codecs);
-    
+
     CommitPoint currentCommitPoint = null;
     String[] files = null;
     try {
       files = directory.listAll();
-    } catch (NoSuchDirectoryException e) {  
+    } catch (NoSuchDirectoryException e) {
       // it means the directory is empty, so ignore it.
       files = new String[0];
     }
@@ -152,7 +158,7 @@ final class IndexFileDeleter {
     for (String fileName : files) {
 
       if ((indexFilenameFilter.accept(null, fileName)) && !fileName.endsWith("write.lock") && !fileName.equals(IndexFileNames.SEGMENTS_GEN)) {
-        
+
         // Add this file to refCounts with initial count 0:
         getRefCount(fileName);
 
@@ -233,7 +239,7 @@ final class IndexFileDeleter {
     // Now delete anything with ref count at 0.  These are
     // presumably abandoned files eg due to crash of
     // IndexWriter.
-    for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {  
+    for(Map.Entry<String, RefCount> entry : refCounts.entrySet() ) {
       RefCount rc = entry.getValue();
       final String fileName = entry.getKey();
       if (0 == rc.count) {
@@ -253,7 +259,7 @@ final class IndexFileDeleter {
     // Always protect the incoming segmentInfos since
     // sometime it may not be the most recent commit
     checkpoint(segmentInfos, false);
-    
+
     startingCommitDeleted = currentCommitPoint == null ? false : currentCommitPoint.isDeleted();
 
     deleteCommits();
@@ -327,7 +333,7 @@ final class IndexFileDeleter {
       segmentPrefix1 = null;
       segmentPrefix2 = null;
     }
-    
+
     for(int i=0;i<files.length;i++) {
       String fileName = files[i];
       if ((segmentName == null || fileName.startsWith(segmentPrefix1) || fileName.startsWith(segmentPrefix2)) &&
@@ -379,7 +385,7 @@ final class IndexFileDeleter {
       deleteCommits();
     }
   }
-  
+
   public void deletePendingFiles() throws IOException {
     if (deletable != null) {
       List<String> oldDeletable = deletable;
@@ -397,7 +403,7 @@ final class IndexFileDeleter {
   /**
    * For definition of "check point" see IndexWriter comments:
    * "Clarification: Check Points (and commits)".
-   * 
+   *
    * Writer calls this when it has made a "consistent
    * change" to the index, meaning new files are written to
    * the index and the in-memory SegmentInfos have been
@@ -417,7 +423,7 @@ final class IndexFileDeleter {
   public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
 
     if (infoStream != null) {
-      message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+      message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
     }
 
     // Try again now to delete any previously un-deletable