You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by us...@apache.org on 2011/05/02 00:38:36 UTC

svn commit: r1098427 [3/5] - in /lucene/dev/trunk: ./ lucene/ lucene/src/java/org/apache/lucene/index/ lucene/src/test-framework/org/apache/lucene/search/ lucene/src/test-framework/org/apache/lucene/store/ lucene/src/test-framework/org/apache/lucene/ut...

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Sun May  1 22:38:33 2011
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.FieldInfos.FieldNumberBiMap;
 import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.index.PayloadProcessorProvider.DirPayloadProcessor;
@@ -46,6 +47,7 @@ import org.apache.lucene.store.BufferedI
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.BitVector;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -54,17 +56,16 @@ import org.apache.lucene.util.MapBackedS
 /**
   An <code>IndexWriter</code> creates and maintains an index.
 
-  <p>The <code>create</code> argument to the {@link
-  #IndexWriter(Directory, IndexWriterConfig) constructor} determines 
+  <p>The {@link OpenMode} option on 
+  {@link IndexWriterConfig#setOpenMode(OpenMode)} determines 
   whether a new index is created, or whether an existing index is
-  opened.  Note that you can open an index with <code>create=true</code>
-  even while readers are using the index.  The old readers will 
+  opened. Note that you can open an index with {@link OpenMode#CREATE}
+  even while readers are using the index. The old readers will 
   continue to search the "point in time" snapshot they had opened, 
-  and won't see the newly created index until they re-open.  There are
-  also {@link #IndexWriter(Directory, IndexWriterConfig) constructors}
-  with no <code>create</code> argument which will create a new index
-  if there is not already an index at the provided path and otherwise 
-  open the existing index.</p>
+  and won't see the newly created index until they re-open. If 
+  {@link OpenMode#CREATE_OR_APPEND} is used IndexWriter will create a 
+  new index if there is not already an index at the provided path
+  and otherwise open the existing index.</p>
 
   <p>In either case, documents are added with {@link #addDocument(Document)
   addDocument} and removed with {@link #deleteDocuments(Term)} or {@link
@@ -76,15 +77,19 @@ import org.apache.lucene.util.MapBackedS
   <a name="flush"></a>
   <p>These changes are buffered in memory and periodically
   flushed to the {@link Directory} (during the above method
-  calls).  A flush is triggered when there are enough
-  buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
-  or enough added documents since the last flush, whichever
-  is sooner.  For the added documents, flushing is triggered
-  either by RAM usage of the documents (see {@link
-  IndexWriterConfig#setRAMBufferSizeMB}) or the number of added documents.
-  The default is to flush when RAM usage hits 16 MB.  For
+  calls). A flush is triggered when there are enough added documents
+  since the last flush. Flushing is triggered either by RAM usage of the
+  documents (see {@link IndexWriterConfig#setRAMBufferSizeMB}) or the
+  number of added documents (see {@link IndexWriterConfig#setMaxBufferedDocs(int)}).
+  The default is to flush when RAM usage hits
+  {@value IndexWriterConfig#DEFAULT_RAM_BUFFER_SIZE_MB} MB. For
   best indexing speed you should flush by RAM usage with a
-  large RAM buffer.  Note that flushing just moves the
+  large RAM buffer. Additionally, if IndexWriter reaches the configured number of
+  buffered deletes (see {@link IndexWriterConfig#setMaxBufferedDeleteTerms})
+  the deleted terms and queries are flushed and applied to existing segments.
+  In contrast to the other flush options {@link IndexWriterConfig#setRAMBufferSizeMB} and 
+  {@link IndexWriterConfig#setMaxBufferedDocs(int)}, deleted terms
+  won't trigger a segment flush. Note that flushing just moves the
   internal buffered state in IndexWriter into the index, but
   these changes are not visible to IndexReader until either
   {@link #commit()} or {@link #close} is called.  A flush may
@@ -165,21 +170,21 @@ import org.apache.lucene.util.MapBackedS
 /*
  * Clarification: Check Points (and commits)
  * IndexWriter writes new index files to the directory without writing a new segments_N
- * file which references these new files. It also means that the state of 
+ * file which references these new files. It also means that the state of
  * the in memory SegmentInfos object is different than the most recent
  * segments_N file written to the directory.
- * 
- * Each time the SegmentInfos is changed, and matches the (possibly 
- * modified) directory files, we have a new "check point". 
- * If the modified/new SegmentInfos is written to disk - as a new 
- * (generation of) segments_N file - this check point is also an 
+ *
+ * Each time the SegmentInfos is changed, and matches the (possibly
+ * modified) directory files, we have a new "check point".
+ * If the modified/new SegmentInfos is written to disk - as a new
+ * (generation of) segments_N file - this check point is also an
  * IndexCommit.
- * 
- * A new checkpoint always replaces the previous checkpoint and 
- * becomes the new "front" of the index. This allows the IndexFileDeleter 
+ *
+ * A new checkpoint always replaces the previous checkpoint and
+ * becomes the new "front" of the index. This allows the IndexFileDeleter
  * to delete files that are referenced only by stale checkpoints.
  * (files that were created since the last commit, but are no longer
- * referenced by the "front" of the index). For this, IndexFileDeleter 
+ * referenced by the "front" of the index). For this, IndexFileDeleter
  * keeps track of the last non commit checkpoint.
  */
 public class IndexWriter implements Closeable {
@@ -195,7 +200,7 @@ public class IndexWriter implements Clos
    * printed to infoStream, if set (see {@link
    * #setInfoStream}).
    */
-  public final static int MAX_TERM_LENGTH = DocumentsWriter.MAX_TERM_LENGTH_UTF8;
+  public final static int MAX_TERM_LENGTH = DocumentsWriterPerThread.MAX_TERM_LENGTH_UTF8;
 
   // The normal read buffer size defaults to 1024, but
   // increasing this during merging seems to yield
@@ -225,7 +230,7 @@ public class IndexWriter implements Clos
   final FieldNumberBiMap globalFieldNumberMap;
 
   private DocumentsWriter docWriter;
-  private IndexFileDeleter deleter;
+  final IndexFileDeleter deleter;
 
   private Set<SegmentInfo> segmentsToOptimize = new HashSet<SegmentInfo>();           // used by optimize to note those needing optimization
   private int optimizeMaxNumSegments;
@@ -247,12 +252,12 @@ public class IndexWriter implements Clos
   private long mergeGen;
   private boolean stopMerges;
 
-  private final AtomicInteger flushCount = new AtomicInteger();
-  private final AtomicInteger flushDeletesCount = new AtomicInteger();
+  final AtomicInteger flushCount = new AtomicInteger();
+  final AtomicInteger flushDeletesCount = new AtomicInteger();
 
   final ReaderPool readerPool = new ReaderPool();
   final BufferedDeletesStream bufferedDeletesStream;
-  
+
   // This is a "write once" variable (like the organic dye
   // on a DVD-R that may or may not be heated by a laser and
   // then cooled to permanently record the event): it's
@@ -339,31 +344,58 @@ public class IndexWriter implements Clos
    */
   IndexReader getReader(boolean applyAllDeletes) throws IOException {
     ensureOpen();
-    
+
     final long tStart = System.currentTimeMillis();
 
     if (infoStream != null) {
       message("flush at getReader");
     }
-
     // Do this up front before flushing so that the readers
     // obtained during this flush are pooled, the first time
     // this method is called:
     poolReaders = true;
-
-    // Prevent segmentInfos from changing while opening the
-    // reader; in theory we could do similar retry logic,
-    // just like we do when loading segments_N
-    IndexReader r;
-    synchronized(this) {
-      flush(false, applyAllDeletes);
-      r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
-      if (infoStream != null) {
-        message("return reader version=" + r.getVersion() + " reader=" + r);
+    final IndexReader r;
+    doBeforeFlush();
+    final boolean anySegmentFlushed;
+    /*
+     * for releasing a NRT reader we must ensure that 
+     * DW doesn't add any segments or deletes until we are
+     * done with creating the NRT DirectoryReader. 
+     * We release the two stage full flush after we are done opening the
+     * directory reader!
+     */
+    synchronized (fullFlushLock) {
+      boolean success = false;
+      try {
+        anySegmentFlushed = docWriter.flushAllThreads();
+        if (!anySegmentFlushed) {
+          // prevent double increment since docWriter#doFlush increments the flushcount
+          // if we flushed anything.
+          flushCount.incrementAndGet();
+        }
+        success = true;
+        // Prevent segmentInfos from changing while opening the
+        // reader; in theory we could do similar retry logic,
+        // just like we do when loading segments_N
+        synchronized(this) {
+          maybeApplyDeletes(applyAllDeletes);
+          r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+          if (infoStream != null) {
+            message("return reader version=" + r.getVersion() + " reader=" + r);
+          }
+        }
+      } finally {
+        if (!success && infoStream != null) {
+          message("hit exception during while NRT reader");
+        }
+        // Done: finish the full flush!
+        docWriter.finishFullFlush(success);
+        doAfterFlush();
       }
     }
-    maybeMerge();
-
+    if (anySegmentFlushed) {
+      maybeMerge();
+    }
     if (infoStream != null) {
       message("getReader took " + (System.currentTimeMillis() - tStart) + " msec");
     }
@@ -400,10 +432,10 @@ public class IndexWriter implements Clos
           if (r != null) {
             r.hasChanges = false;
           }
-        }     
+        }
       }
     }
-    
+
     // used only by asserts
     public synchronized boolean infoIsLive(SegmentInfo info) {
       int idx = segmentInfos.indexOf(info);
@@ -419,7 +451,7 @@ public class IndexWriter implements Clos
       }
       return info;
     }
-    
+
     /**
      * Release the segment reader (i.e. decRef it and close if there
      * are no more references.
@@ -432,7 +464,7 @@ public class IndexWriter implements Clos
     public synchronized boolean release(SegmentReader sr) throws IOException {
       return release(sr, false);
     }
-    
+
     /**
      * Release the segment reader (i.e. decRef it and close if there
      * are no more references.
@@ -493,7 +525,7 @@ public class IndexWriter implements Clos
         sr.close();
       }
     }
-    
+
     /** Remove all our references to readers, and commits
      *  any pending changes. */
     synchronized void close() throws IOException {
@@ -503,7 +535,7 @@ public class IndexWriter implements Clos
 
       Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
       while (iter.hasNext()) {
-        
+
         Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
 
         SegmentReader sr = ent.getValue();
@@ -526,7 +558,7 @@ public class IndexWriter implements Clos
         sr.decRef();
       }
     }
-    
+
     /**
      * Commit all segment reader in the pool.
      * @throws IOException
@@ -550,7 +582,7 @@ public class IndexWriter implements Clos
         }
       }
     }
-    
+
     /**
      * Returns a ref to a clone.  NOTE: this clone is not
      * enrolled in the pool, so you should simply close()
@@ -564,7 +596,7 @@ public class IndexWriter implements Clos
         sr.decRef();
       }
     }
-   
+
     /**
      * Obtain a SegmentReader from the readerPool.  The reader
      * must be returned by calling {@link #release(SegmentReader)}
@@ -580,7 +612,7 @@ public class IndexWriter implements Clos
     /**
      * Obtain a SegmentReader from the readerPool.  The reader
      * must be returned by calling {@link #release(SegmentReader)}
-     * 
+     *
      * @see #release(SegmentReader)
      * @param info
      * @param doOpenStores
@@ -638,7 +670,7 @@ public class IndexWriter implements Clos
       return sr;
     }
   }
-  
+
   /**
    * Obtain the number of deleted docs for a pooled reader.
    * If the reader isn't being pooled, the segmentInfo's 
@@ -658,7 +690,7 @@ public class IndexWriter implements Clos
       }
     }
   }
-  
+
   /**
    * Used internally to throw an {@link
    * AlreadyClosedException} if this IndexWriter has been
@@ -721,7 +753,7 @@ public class IndexWriter implements Clos
     mergePolicy.setIndexWriter(this);
     mergeScheduler = conf.getMergeScheduler();
     codecs = conf.getCodecProvider();
-    
+
     bufferedDeletesStream = new BufferedDeletesStream(messageID);
     bufferedDeletesStream.setInfoStream(infoStream);
     poolReaders = conf.getReaderPooling();
@@ -790,8 +822,7 @@ public class IndexWriter implements Clos
 
       // start with previous field numbers, but new FieldInfos
       globalFieldNumberMap = segmentInfos.getOrLoadGlobalFieldNumberMap(directory);
-      docWriter = new DocumentsWriter(config, directory, this, conf.getIndexingChain(),
-          globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)), bufferedDeletesStream);
+      docWriter = new DocumentsWriter(config, directory, this, globalFieldNumberMap, bufferedDeletesStream);
       docWriter.setInfoStream(infoStream);
 
       // Default deleter (for backwards compatibility) is
@@ -849,7 +880,7 @@ public class IndexWriter implements Clos
   public IndexWriterConfig getConfig() {
     return config;
   }
-  
+
   /** If non-null, this will be the default infoStream used
    * by a newly instantiated IndexWriter.
    * @see #setInfoStream
@@ -901,7 +932,7 @@ public class IndexWriter implements Clos
   public boolean verbose() {
     return infoStream != null;
   }
-  
+
   /**
    * Commits all changes to an index and closes all
    * associated files.  Note that this may be a costly
@@ -916,7 +947,7 @@ public class IndexWriter implements Clos
    * even though part of it (flushing buffered documents)
    * may have succeeded, so the write lock will still be
    * held.</p>
-   * 
+   *
    * <p> If you can correct the underlying cause (eg free up
    * some disk space) then you can call close() again.
    * Failing that, if you want to force the write lock to be
@@ -1036,7 +1067,7 @@ public class IndexWriter implements Clos
 
       if (infoStream != null)
         message("now call final commit()");
-      
+
       if (!hitOOM) {
         commitInternal(null);
       }
@@ -1049,7 +1080,7 @@ public class IndexWriter implements Clos
         docWriter = null;
         deleter.close();
       }
-      
+
       if (writeLock != null) {
         writeLock.release();                          // release write lock
         writeLock = null;
@@ -1072,7 +1103,7 @@ public class IndexWriter implements Clos
   }
 
   /** Returns the Directory used by this index. */
-  public Directory getDirectory() {     
+  public Directory getDirectory() {
     // Pass false because the flush during closing calls getDirectory
     ensureOpen(false);
     return directory;
@@ -1196,22 +1227,7 @@ public class IndexWriter implements Clos
    * @throws IOException if there is a low-level IO error
    */
   public void addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
-    ensureOpen();
-    boolean doFlush = false;
-    boolean success = false;
-    try {
-      try {
-        doFlush = docWriter.updateDocument(doc, analyzer, null);
-        success = true;
-      } finally {
-        if (!success && infoStream != null)
-          message("hit exception adding document");
-      }
-      if (doFlush)
-        flush(true, false);
-    } catch (OutOfMemoryError oom) {
-      handleOOM(oom, "addDocument");
-    }
+    updateDocument(null, doc, analyzer);
   }
 
   /**
@@ -1228,9 +1244,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term term) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerm(term, false)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(term);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term)");
     }
@@ -1238,7 +1252,8 @@ public class IndexWriter implements Clos
 
   /**
    * Deletes the document(s) containing any of the
-   * terms. All deletes are flushed at the same time.
+   * terms. All given deletes are applied and flushed atomically
+   * at the same time.
    *
    * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
    * you should immediately close the writer.  See <a
@@ -1252,9 +1267,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Term... terms) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteTerms(terms)) {
-        flush(true, false);
-      }
+      docWriter.deleteTerms(terms);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Term..)");
     }
@@ -1274,9 +1287,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query query) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteQuery(query)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(query);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query)");
     }
@@ -1284,7 +1295,7 @@ public class IndexWriter implements Clos
 
   /**
    * Deletes the document(s) matching any of the provided queries.
-   * All deletes are flushed at the same time.
+   * All given deletes are applied and flushed atomically at the same time.
    *
    * <p><b>NOTE</b>: if this method hits an OutOfMemoryError
    * you should immediately close the writer.  See <a
@@ -1298,9 +1309,7 @@ public class IndexWriter implements Clos
   public void deleteDocuments(Query... queries) throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      if (docWriter.deleteQueries(queries)) {
-        flush(true, false);
-      }
+      docWriter.deleteQueries(queries);
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "deleteDocuments(Query..)");
     }
@@ -1350,17 +1359,18 @@ public class IndexWriter implements Clos
       throws CorruptIndexException, IOException {
     ensureOpen();
     try {
-      boolean doFlush = false;
       boolean success = false;
+      boolean anySegmentFlushed = false;
       try {
-        doFlush = docWriter.updateDocument(doc, analyzer, term);
+        anySegmentFlushed = docWriter.updateDocument(doc, analyzer, term);
         success = true;
       } finally {
         if (!success && infoStream != null)
           message("hit exception updating document");
       }
-      if (doFlush) {
-        flush(true, false);
+
+      if (anySegmentFlushed) {
+        maybeMerge();
       }
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "updateDocument");
@@ -1546,7 +1556,7 @@ public class IndexWriter implements Clos
       resetMergeExceptions();
       segmentsToOptimize = new HashSet<SegmentInfo>(segmentInfos);
       optimizeMaxNumSegments = maxNumSegments;
-      
+
       // Now mark all pending & running merges as optimize
       // merge:
       for(final MergePolicy.OneMerge merge  : pendingMerges) {
@@ -1612,12 +1622,12 @@ public class IndexWriter implements Clos
       if (merge.optimize)
         return true;
     }
-    
+
     for (final MergePolicy.OneMerge merge : runningMerges) {
       if (merge.optimize)
         return true;
     }
-    
+
     return false;
   }
 
@@ -1914,7 +1924,7 @@ public class IndexWriter implements Clos
   /**
    * Delete all documents in the index.
    *
-   * <p>This method will drop all buffered documents and will 
+   * <p>This method will drop all buffered documents and will
    *    remove all segments from the index. This change will not be
    *    visible until a {@link #commit()} has been called. This method
    *    can be rolled back using {@link #rollback()}.</p>
@@ -1944,7 +1954,7 @@ public class IndexWriter implements Clos
       deleter.refresh();
 
       // Don't bother saving any changes in our segmentInfos
-      readerPool.clear(null);      
+      readerPool.clear(null);
 
       // Mark that the index has changed
       ++changeCount;
@@ -1971,7 +1981,7 @@ public class IndexWriter implements Clos
         mergeFinish(merge);
       }
       pendingMerges.clear();
-      
+
       for (final MergePolicy.OneMerge merge : runningMerges) {
         if (infoStream != null)
           message("now abort running merge " + merge.segString(directory));
@@ -1998,7 +2008,7 @@ public class IndexWriter implements Clos
         message("all running merges have aborted");
 
     } else {
-      // waitForMerges() will ensure any running addIndexes finishes.  
+      // waitForMerges() will ensure any running addIndexes finishes.
       // It's fine if a new one attempts to start because from our
       // caller above the call will see that we are in the
       // process of closing, and will throw an
@@ -2010,7 +2020,7 @@ public class IndexWriter implements Clos
   /**
    * Wait for any currently outstanding merges to finish.
    *
-   * <p>It is guaranteed that any merges started prior to calling this method 
+   * <p>It is guaranteed that any merges started prior to calling this method
    *    will have completed once this method completes.</p>
    */
   public synchronized void waitForMerges() {
@@ -2040,6 +2050,125 @@ public class IndexWriter implements Clos
     deleter.checkpoint(segmentInfos, false);
   }
 
+  /**
+   * Prepares the {@link SegmentInfo} for the new flushed segment and persists
+   * the deleted documents {@link BitVector}. Use
+   * {@link #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)} to
+   * publish the returned {@link SegmentInfo} together with its segment private
+   * delete packet.
+   * 
+   * @see #publishFlushedSegment(SegmentInfo, FrozenBufferedDeletes)
+   */
+  SegmentInfo prepareFlushedSegment(FlushedSegment flushedSegment) throws IOException {
+    assert flushedSegment != null;
+
+    SegmentInfo newSegment = flushedSegment.segmentInfo;
+
+    setDiagnostics(newSegment, "flush");
+
+    boolean success = false;
+    try {
+      if (useCompoundFile(newSegment)) {
+        String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+        message("creating compound file " + compoundFileName);
+        // Now build compound file
+        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, compoundFileName);
+        for(String fileName : newSegment.files()) {
+          cfsWriter.addFile(fileName);
+        }
+
+        // Perform the merge
+        cfsWriter.close();
+        synchronized(this) {
+          deleter.deleteNewFiles(newSegment.files());
+        }
+
+        newSegment.setUseCompoundFile(true);
+      }
+
+      // Must write deleted docs after the CFS so we don't
+      // slurp the del file into CFS:
+      if (flushedSegment.deletedDocuments != null) {
+        final int delCount = flushedSegment.deletedDocuments.count();
+        assert delCount > 0;
+        newSegment.setDelCount(delCount);
+        newSegment.advanceDelGen();
+        final String delFileName = newSegment.getDelFileName();
+        if (infoStream != null) {
+          message("flush: write " + delCount + " deletes to " + delFileName);
+        }
+        boolean success2 = false;
+        try {
+          // TODO: in the NRT case it'd be better to hand
+          // this del vector over to the
+          // shortly-to-be-opened SegmentReader and let it
+          // carry the changes; there's no reason to use
+          // filesystem as intermediary here.
+          flushedSegment.deletedDocuments.write(directory, delFileName);
+          success2 = true;
+        } finally {
+          if (!success2) {
+            try {
+              directory.deleteFile(delFileName);
+            } catch (Throwable t) {
+              // suppress this so we keep throwing the
+              // original exception
+            }
+          }
+        }
+      }
+
+      success = true;
+    } finally {
+      if (!success) {
+        if (infoStream != null) {
+          message("hit exception " +
+              "reating compound file for newly flushed segment " + newSegment.name);
+        }
+
+        synchronized(this) {
+          deleter.refresh(newSegment.name);
+        }
+      }
+    }
+    return newSegment;
+  }
+  
+  /**
+   * Atomically adds the segment private delete packet and publishes the flushed
+   * segments SegmentInfo to the index writer. NOTE: use
+   * {@link #prepareFlushedSegment(FlushedSegment)} to obtain the
+   * {@link SegmentInfo} for the flushed segment.
+   * 
+   * @see #prepareFlushedSegment(FlushedSegment)
+   */
+  synchronized void publishFlushedSegment(SegmentInfo newSegment,
+      FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
+    // Lock order IW -> BDS
+    synchronized (bufferedDeletesStream) {
+      if (globalPacket != null && globalPacket.any()) {
+        bufferedDeletesStream.push(globalPacket);
+      } 
+      // Publishing the segment must be synched on IW -> BDS to make the sure
+      // that no merge prunes away the seg. private delete packet
+      final long nextGen;
+      if (packet != null && packet.any()) {
+        nextGen = bufferedDeletesStream.push(packet);
+      } else {
+        // Since we don't have a delete packet to apply we can get a new
+        // generation right away
+        nextGen = bufferedDeletesStream.getNextGen();
+      }
+      newSegment.setBufferedDeletesGen(nextGen);
+      segmentInfos.add(newSegment);
+      checkpoint();
+    }
+  }
+
+  synchronized boolean useCompoundFile(SegmentInfo segmentInfo) throws IOException {
+    return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+  }
+
   private synchronized void resetMergeExceptions() {
     mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
     mergeGen++;
@@ -2088,11 +2217,11 @@ public class IndexWriter implements Clos
    * <p>
    * <b>NOTE:</b> this method only copies the segments of the incoming indexes
    * and does not merge them. Therefore deleted documents are not removed and
-   * the new segments are not merged with the existing ones. Also, the segments 
-   * are copied as-is, meaning they are not converted to CFS if they aren't, 
-   * and vice-versa. If you wish to do that, you can call {@link #maybeMerge} 
+   * the new segments are not merged with the existing ones. Also, the segments
+   * are copied as-is, meaning they are not converted to CFS if they aren't,
+   * and vice-versa. If you wish to do that, you can call {@link #maybeMerge}
    * or {@link #optimize} afterwards.
-   * 
+   *
    * <p>This requires this index not be among those to be added.
    *
    * <p>
@@ -2129,7 +2258,7 @@ public class IndexWriter implements Clos
           docCount += info.docCount;
           String newSegName = newSegmentName();
           String dsName = info.getDocStoreSegment();
-          
+
           if (infoStream != null) {
             message("addIndexes: process segment origName=" + info.name + " newName=" + newSegName + " dsName=" + dsName + " info=" + info);
           }
@@ -2176,7 +2305,7 @@ public class IndexWriter implements Clos
 
           infos.add(info);
         }
-      }      
+      }
 
       synchronized (this) {
         ensureOpen();
@@ -2225,11 +2354,12 @@ public class IndexWriter implements Clos
       SegmentMerger merger = new SegmentMerger(directory, config.getTermIndexInterval(),
                                                mergedName, null, codecs, payloadProcessorProvider,
                                                globalFieldNumberMap.newFieldInfos(SegmentCodecsBuilder.create(codecs)));
-      
+
       for (IndexReader reader : readers)      // add new indexes
         merger.add(reader);
-      
+
       int docCount = merger.merge();                // merge 'em
+
       final FieldInfos fieldInfos = merger.fieldInfos();
       SegmentInfo info = new SegmentInfo(mergedName, docCount, directory,
                                          false, fieldInfos.hasProx(), merger.getSegmentCodecs(),
@@ -2241,11 +2371,11 @@ public class IndexWriter implements Clos
       synchronized(this) { // Guard segmentInfos
         useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, info);
       }
-      
+
       // Now create the compound file if needed
       if (useCompoundFile) {
         merger.createCompoundFile(mergedName + ".cfs", info);
-        
+
         // delete new non cfs files directly: they were never
         // registered with IFD
         deleter.deleteNewFiles(info.files());
@@ -2297,7 +2427,7 @@ public class IndexWriter implements Clos
    *  #commit()} to finish the commit, or {@link
    *  #rollback()} to revert the commit and undo all changes
    *  done since the writer was opened.</p>
-   * 
+   *
    *  You can also just call {@link #commit(Map)} directly
    *  without prepareCommit first in which case that method
    *  will internally call prepareCommit.
@@ -2441,6 +2571,10 @@ public class IndexWriter implements Clos
     }
   }
 
+  // Ensures only one flush() is actually flushing segments
+  // at a time:
+  private final Object fullFlushLock = new Object();
+
   /**
    * Flush all in-memory buffered updates (adds and deletes)
    * to the Directory.
@@ -2464,116 +2598,104 @@ public class IndexWriter implements Clos
     }
   }
 
-  // TODO: this method should not have to be entirely
-  // synchronized, ie, merges should be allowed to commit
-  // even while a flush is happening
-  private synchronized boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
-
+  private boolean doFlush(boolean applyAllDeletes) throws CorruptIndexException, IOException {
     if (hitOOM) {
       throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot flush");
     }
 
     doBeforeFlush();
-
     assert testPoint("startDoFlush");
-
-    // We may be flushing because it was triggered by doc
-    // count, del count, ram usage (in which case flush
-    // pending is already set), or we may be flushing
-    // due to external event eg getReader or commit is
-    // called (in which case we now set it, and this will
-    // pause all threads):
-    flushControl.setFlushPendingNoWait("explicit flush");
-
     boolean success = false;
-
     try {
 
       if (infoStream != null) {
         message("  start flush: applyAllDeletes=" + applyAllDeletes);
         message("  index before flush " + segString());
       }
-    
-      final SegmentInfo newSegment = docWriter.flush(this, deleter, mergePolicy, segmentInfos);
-      if (newSegment != null) {
-        setDiagnostics(newSegment, "flush");
-        segmentInfos.add(newSegment);
-        checkpoint();
-      }
-
-      if (!applyAllDeletes) {
-        // If deletes alone are consuming > 1/2 our RAM
-        // buffer, force them all to apply now. This is to
-        // prevent too-frequent flushing of a long tail of
-        // tiny segments:
-        if (flushControl.getFlushDeletes() ||
-            (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-             bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
-          applyAllDeletes = true;
-          if (infoStream != null) {
-            message("force apply deletes bytesUsed=" + bufferedDeletesStream.bytesUsed() + " vs ramBuffer=" + (1024*1024*config.getRAMBufferSizeMB()));
-          }
+      final boolean anySegmentFlushed;
+      
+      synchronized (fullFlushLock) {
+        try {
+          anySegmentFlushed = docWriter.flushAllThreads();
+          success = true;
+        } finally {
+          docWriter.finishFullFlush(success);
         }
       }
-
-      if (applyAllDeletes) {
-        if (infoStream != null) {
-          message("apply all deletes during flush");
-        }
-        flushDeletesCount.incrementAndGet();
-        final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
-        if (result.anyDeletes) {
-          checkpoint();
-        }
-        if (!keepFullyDeletedSegments && result.allDeleted != null) {
-          if (infoStream != null) {
-            message("drop 100% deleted segments: " + result.allDeleted);
-          }
-          for(SegmentInfo info : result.allDeleted) {
-            // If a merge has already registered for this
-            // segment, we leave it in the readerPool; the
-            // merge will skip merging it and will then drop
-            // it once it's done:
-            if (!mergingSegments.contains(info)) {
-              segmentInfos.remove(info);
-              if (readerPool != null) {
-                readerPool.drop(info);
-              }
-            }
-          }
-          checkpoint();
+      success = false;
+      synchronized(this) {
+        maybeApplyDeletes(applyAllDeletes);
+        doAfterFlush();
+        if (!anySegmentFlushed) {
+          // flushCount is incremented in flushAllThreads
+          flushCount.incrementAndGet();
         }
-        bufferedDeletesStream.prune(segmentInfos);
-        assert !bufferedDeletesStream.any();
-        flushControl.clearDeletes();
-      } else if (infoStream != null) {
-        message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+        success = true;
+        return anySegmentFlushed;
       }
-
-      doAfterFlush();
-      flushCount.incrementAndGet();
-
-      success = true;
-
-      return newSegment != null;
-
     } catch (OutOfMemoryError oom) {
       handleOOM(oom, "doFlush");
       // never hit
       return false;
     } finally {
-      flushControl.clearFlushPending();
       if (!success && infoStream != null)
         message("hit exception during flush");
     }
   }
+  
+  final synchronized void maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
+    if (applyAllDeletes) {
+      if (infoStream != null) {
+        message("apply all deletes during flush");
+      }
+      applyAllDeletes();
+    } else if (infoStream != null) {
+      message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
+    }
+
+  }
+  
+  final synchronized void applyAllDeletes() throws IOException {
+    flushDeletesCount.incrementAndGet();
+    final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
+      .applyDeletes(readerPool, segmentInfos);
+    if (result.anyDeletes) {
+      checkpoint();
+    }
+    if (!keepFullyDeletedSegments && result.allDeleted != null) {
+      if (infoStream != null) {
+        message("drop 100% deleted segments: " + result.allDeleted);
+      }
+      for (SegmentInfo info : result.allDeleted) {
+        // If a merge has already registered for this
+        // segment, we leave it in the readerPool; the
+        // merge will skip merging it and will then drop
+        // it once it's done:
+        if (!mergingSegments.contains(info)) {
+          segmentInfos.remove(info);
+          if (readerPool != null) {
+            readerPool.drop(info);
+          }
+        }
+      }
+      checkpoint();
+    }
+    bufferedDeletesStream.prune(segmentInfos);
+  }
 
   /** Expert:  Return the total size of all index files currently cached in memory.
    * Useful for size management with flushRamDocs()
    */
   public final long ramSizeInBytes() {
     ensureOpen();
-    return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
+    return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+  }
+  
+  // for testing only
+  DocumentsWriter getDocsWriter() {
+    boolean test = false;
+    assert test = true;
+    return test?docWriter: null;
   }
 
   /** Expert:  Return the number of documents currently
@@ -2709,7 +2831,7 @@ public class IndexWriter implements Clos
     }
 
     commitMergedDeletes(merge, mergedReader);
-      
+
     // If the doc store we are using has been closed and
     // is in now compound format (but wasn't when we
     // started), then we will switch to the compound
@@ -2723,7 +2845,7 @@ public class IndexWriter implements Clos
       message("merged segment " + merge.info + " is 100% deleted" +  (keepFullyDeletedSegments ? "" : "; skipping insert"));
     }
 
-    final Set mergedAway = new HashSet<SegmentInfo>(merge.segments);
+    final Set<SegmentInfo> mergedAway = new HashSet<SegmentInfo>(merge.segments);
     int segIdx = 0;
     int newSegIdx = 0;
     boolean inserted = false;
@@ -2770,15 +2892,15 @@ public class IndexWriter implements Clos
     // them so that they don't bother writing them to
     // disk, updating SegmentInfo, etc.:
     readerPool.clear(merge.segments);
-    
+
     if (merge.optimize) {
       // cascade the optimize:
       segmentsToOptimize.add(merge.info);
     }
-    
+
     return true;
   }
-  
+
   final private void handleMergeException(Throwable t, MergePolicy.OneMerge merge) throws IOException {
 
     if (infoStream != null) {
@@ -2867,7 +2989,7 @@ public class IndexWriter implements Clos
   /** Hook that's called when the specified merge is complete. */
   void mergeSuccess(MergePolicy.OneMerge merge) {
   }
-  
+
   /** Checks whether this merge involves any segments
    *  already participating in a merge.  If not, this merge
    *  is "registered", meaning we record that its segments
@@ -2998,7 +3120,6 @@ public class IndexWriter implements Clos
 
     // Lock order: IW -> BD
     bufferedDeletesStream.prune(segmentInfos);
-
     Map<String,String> details = new HashMap<String,String>();
     details.put("optimize", Boolean.toString(merge.optimize));
     details.put("mergeFactor", Integer.toString(merge.segments.size()));
@@ -3019,11 +3140,11 @@ public class IndexWriter implements Clos
     mergingSegments.add(merge.info);
   }
 
-  private void setDiagnostics(SegmentInfo info, String source) {
+  static void setDiagnostics(SegmentInfo info, String source) {
     setDiagnostics(info, source, null);
   }
 
-  private void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
+  private static void setDiagnostics(SegmentInfo info, String source, Map<String,String> details) {
     Map<String,String> diagnostics = new HashMap<String,String>();
     diagnostics.put("source", source);
     diagnostics.put("lucene.version", Constants.LUCENE_VERSION);
@@ -3041,7 +3162,7 @@ public class IndexWriter implements Clos
   /** Does fininishing for a merge, which is fast but holds
    *  the synchronized lock on IndexWriter instance. */
   final synchronized void mergeFinish(MergePolicy.OneMerge merge) throws IOException {
-    
+
     // Optimize, addIndexes or finishMerges may be waiting
     // on merges to finish.
     notifyAll();
@@ -3113,11 +3234,11 @@ public class IndexWriter implements Clos
    *  instance */
   private int mergeMiddle(MergePolicy.OneMerge merge)
     throws CorruptIndexException, IOException {
-    
+
     merge.checkAborted(directory);
 
     final String mergedName = merge.info.name;
-    
+
     int mergedDocCount = 0;
 
     SegmentInfos sourceSegments = merge.segments;
@@ -3191,7 +3312,7 @@ public class IndexWriter implements Clos
         message("merge store matchedCount=" + merger.getMatchedSubReaderCount() + " vs " + merge.readers.size());
       }
       anyNonBulkMerges |= merger.getAnyNonBulkMerges();
-      
+
       assert mergedDocCount == totDocCount: "mergedDocCount=" + mergedDocCount + " vs " + totDocCount;
 
       // Very important to do this before opening the reader
@@ -3325,12 +3446,12 @@ public class IndexWriter implements Clos
 
   // For test purposes.
   final int getBufferedDeleteTermsSize() {
-    return docWriter.getPendingDeletes().terms.size();
+    return docWriter.getBufferedDeleteTermsSize();
   }
 
   // For test purposes.
   final int getNumBufferedDeleteTerms() {
-    return docWriter.getPendingDeletes().numTermDeletes.get();
+    return docWriter.getNumBufferedDeleteTerms();
   }
 
   // utility routines for tests
@@ -3445,17 +3566,17 @@ public class IndexWriter implements Clos
 
         assert lastCommitChangeCount <= changeCount;
         myChangeCount = changeCount;
-        
+
         if (changeCount == lastCommitChangeCount) {
           if (infoStream != null)
             message("  skip startCommit(): no changes pending");
           return;
         }
-        
+
         // First, we clone & incref the segmentInfos we intend
         // to sync, then, without locking, we sync() all files
         // referenced by toSync, in the background.
-        
+
         if (infoStream != null)
           message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
 
@@ -3463,10 +3584,10 @@ public class IndexWriter implements Clos
         toSync = (SegmentInfos) segmentInfos.clone();
 
         assert filesExist(toSync);
-        
+
         if (commitUserData != null)
           toSync.setUserData(commitUserData);
-        
+
         // This protects the segmentInfos we are now going
         // to commit.  This is important in case, eg, while
         // we are trying to sync all referenced files, a
@@ -3598,7 +3719,7 @@ public class IndexWriter implements Clos
 
   /** Expert: remove any index files that are no longer
    *  used.
-   * 
+   *
    *  <p> IndexWriter normally deletes unused files itself,
    *  during indexing.  However, on Windows, which disallows
    *  deletion of open files, if there is a reader open on
@@ -3647,7 +3768,7 @@ public class IndexWriter implements Clos
   public void setPayloadProcessorProvider(PayloadProcessorProvider pcp) {
     payloadProcessorProvider = pcp;
   }
-  
+
   /**
    * Returns the {@link PayloadProcessorProvider} that is used during segment
    * merges to process payloads.
@@ -3655,124 +3776,4 @@ public class IndexWriter implements Clos
   public PayloadProcessorProvider getPayloadProcessorProvider() {
     return payloadProcessorProvider;
   }
-
-  // decides when flushes happen
-  final class FlushControl {
-
-    private boolean flushPending;
-    private boolean flushDeletes;
-    private int delCount;
-    private int docCount;
-    private boolean flushing;
-
-    private synchronized boolean setFlushPending(String reason, boolean doWait) {
-      if (flushPending || flushing) {
-        if (doWait) {
-          while(flushPending || flushing) {
-            try {
-              wait();
-            } catch (InterruptedException ie) {
-              throw new ThreadInterruptedException(ie);
-            }
-          }
-        }
-        return false;
-      } else {
-        if (infoStream != null) {
-          message("now trigger flush reason=" + reason);
-        }
-        flushPending = true;
-        return flushPending;
-      }
-    }
-
-    public synchronized void setFlushPendingNoWait(String reason) {
-      setFlushPending(reason, false);
-    }
-
-    public synchronized boolean getFlushPending() {
-      return flushPending;
-    }
-
-    public synchronized boolean getFlushDeletes() {
-      return flushDeletes;
-    }
-
-    public synchronized void clearFlushPending() {
-      if (infoStream != null) {
-        message("clearFlushPending");
-      }
-      flushPending = false;
-      flushDeletes = false;
-      docCount = 0;
-      notifyAll();
-    }
-
-    public synchronized void clearDeletes() {
-      delCount = 0;
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc) {
-      return waitUpdate(docInc, delInc, false);
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
-      while(flushPending) {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
-        }
-      }
-
-      // skipWait is only used when a thread is BOTH adding
-      // a doc and buffering a del term, and, the adding of
-      // the doc already triggered a flush
-      if (skipWait) {
-        docCount += docInc;
-        delCount += delInc;
-        return false;
-      }
-
-      final int maxBufferedDocs = config.getMaxBufferedDocs();
-      if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (docCount+docInc) >= maxBufferedDocs) {
-        return setFlushPending("maxBufferedDocs", true);
-      }
-      docCount += docInc;
-
-      final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
-      if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (delCount+delInc) >= maxBufferedDeleteTerms) {
-        flushDeletes = true;
-        return setFlushPending("maxBufferedDeleteTerms", true);
-      }
-      delCount += delInc;
-
-      return flushByRAMUsage("add delete/doc");
-    }
-
-    public synchronized boolean flushByRAMUsage(String reason) {
-      final double ramBufferSizeMB = config.getRAMBufferSizeMB();
-      if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-        final long limit = (long) (ramBufferSizeMB*1024*1024);
-        long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-        if (used >= limit) {
-          
-          // DocumentsWriter may be able to free up some
-          // RAM:
-          // Lock order: FC -> DW
-          docWriter.balanceRAM();
-
-          used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-          if (used >= limit) {
-            return setFlushPending("ram full: " + reason, false);
-          }
-        }
-      }
-      return false;
-    }
-  }
-
-  final FlushControl flushControl = new FlushControl();
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Sun May  1 22:38:33 2011
@@ -18,7 +18,7 @@ package org.apache.lucene.index;
  */
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.DocumentsWriter.IndexingChain;
+import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
 import org.apache.lucene.index.codecs.CodecProvider;
 import org.apache.lucene.search.IndexSearcher;
@@ -41,7 +41,7 @@ import org.apache.lucene.util.Version;
  * IndexWriterConfig conf = new IndexWriterConfig(analyzer);
  * conf.setter1().setter2();
  * </pre>
- * 
+ *
  * @since 3.1
  */
 public final class IndexWriterConfig implements Cloneable {
@@ -56,7 +56,7 @@ public final class IndexWriterConfig imp
    * </ul>
    */
   public static enum OpenMode { CREATE, APPEND, CREATE_OR_APPEND }
-  
+
   /** Default value is 32. Change using {@link #setTermIndexInterval(int)}. */
   public static final int DEFAULT_TERM_INDEX_INTERVAL = 32; // TODO: this should be private to the codec, not settable here
 
@@ -77,23 +77,19 @@ public final class IndexWriterConfig imp
 
   /**
    * Default value for the write lock timeout (1,000 ms).
-   * 
+   *
    * @see #setDefaultWriteLockTimeout(long)
    */
   public static long WRITE_LOCK_TIMEOUT = 1000;
 
-  /** The maximum number of simultaneous threads that may be
-   *  indexing documents at once in IndexWriter; if more
-   *  than this many threads arrive they will wait for
-   *  others to finish. */
-  public final static int DEFAULT_MAX_THREAD_STATES = 8;
-
   /** Default setting for {@link #setReaderPooling}. */
   public final static boolean DEFAULT_READER_POOLING = false;
 
   /** Default value is 1. Change using {@link #setReaderTermsIndexDivisor(int)}. */
   public static final int DEFAULT_READER_TERMS_INDEX_DIVISOR = IndexReader.DEFAULT_TERMS_INDEX_DIVISOR;
 
+  /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
+  public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
   /**
    * Sets the default (for any instance) maximum time to wait for a write lock
    * (in milliseconds).
@@ -105,7 +101,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the default write lock timeout for newly instantiated
    * IndexWriterConfigs.
-   * 
+   *
    * @see #setDefaultWriteLockTimeout(long)
    */
   public static long getDefaultWriteLockTimeout() {
@@ -127,10 +123,12 @@ public final class IndexWriterConfig imp
   private volatile IndexReaderWarmer mergedSegmentWarmer;
   private volatile CodecProvider codecProvider;
   private volatile MergePolicy mergePolicy;
-  private volatile int maxThreadStates;
+  private volatile DocumentsWriterPerThreadPool indexerThreadPool;
   private volatile boolean readerPooling;
   private volatile int readerTermsIndexDivisor;
-  
+  private volatile FlushPolicy flushPolicy;
+  private volatile int perThreadHardLimitMB;
+
   private Version matchVersion;
 
   /**
@@ -153,15 +151,16 @@ public final class IndexWriterConfig imp
     maxBufferedDeleteTerms = DEFAULT_MAX_BUFFERED_DELETE_TERMS;
     ramBufferSizeMB = DEFAULT_RAM_BUFFER_SIZE_MB;
     maxBufferedDocs = DEFAULT_MAX_BUFFERED_DOCS;
-    indexingChain = DocumentsWriter.defaultIndexingChain;
+    indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
     mergedSegmentWarmer = null;
     codecProvider = CodecProvider.getDefault();
     mergePolicy = new TieredMergePolicy();
-    maxThreadStates = DEFAULT_MAX_THREAD_STATES;
     readerPooling = DEFAULT_READER_POOLING;
+    indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
     readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
+    perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
   }
-  
+
   @Override
   public Object clone() {
     // Shallow clone is the only thing that's possible, since parameters like
@@ -186,7 +185,7 @@ public final class IndexWriterConfig imp
     this.openMode = openMode;
     return this;
   }
-  
+
   /** Returns the {@link OpenMode} set by {@link #setOpenMode(OpenMode)}. */
   public OpenMode getOpenMode() {
     return openMode;
@@ -261,7 +260,7 @@ public final class IndexWriterConfig imp
   public SimilarityProvider getSimilarityProvider() {
     return similarityProvider;
   }
-  
+
   /**
    * Expert: set the interval between indexed terms. Large values cause less
    * memory to be used by IndexReader, but slow random-access to terms. Small
@@ -281,7 +280,7 @@ public final class IndexWriterConfig imp
    * In particular, <code>numUniqueTerms/interval</code> terms are read into
    * memory by an IndexReader, and, on average, <code>interval/2</code> terms
    * must be scanned for each random term access.
-   * 
+   *
    * @see #DEFAULT_TERM_INDEX_INTERVAL
    *
    * <p>Takes effect immediately, but only applies to newly
@@ -293,7 +292,7 @@ public final class IndexWriterConfig imp
 
   /**
    * Returns the interval between indexed terms.
-   * 
+   *
    * @see #setTermIndexInterval(int)
    */
   public int getTermIndexInterval() { // TODO: this should be private to the codec, not settable here
@@ -331,10 +330,10 @@ public final class IndexWriterConfig imp
     this.writeLockTimeout = writeLockTimeout;
     return this;
   }
-  
+
   /**
    * Returns allowed timeout when acquiring the write lock.
-   * 
+   *
    * @see #setWriteLockTimeout(long)
    */
   public long getWriteLockTimeout() {
@@ -343,15 +342,16 @@ public final class IndexWriterConfig imp
 
   /**
    * Determines the minimal number of delete terms required before the buffered
-   * in-memory delete terms are applied and flushed. If there are documents
-   * buffered in memory at the time, they are merged and a new segment is
-   * created.
-
-   * <p>Disabled by default (writer flushes by RAM usage).
+   * in-memory delete terms and queries are applied and flushed.
+   * <p>Disabled by default (writer flushes by RAM usage).</p>
+   * <p>
+   * NOTE:  This setting won't trigger a segment flush.
+   * </p>
    * 
    * @throws IllegalArgumentException if maxBufferedDeleteTerms
    * is enabled but smaller than 1
    * @see #setRAMBufferSizeMB
+   * @see #setFlushPolicy(FlushPolicy)
    *
    * <p>Takes effect immediately, but only the next time a
    * document is added, updated or deleted.
@@ -366,9 +366,9 @@ public final class IndexWriterConfig imp
   }
 
   /**
-   * Returns the number of buffered deleted terms that will trigger a flush if
-   * enabled.
-   * 
+   * Returns the number of buffered deleted terms that will trigger a flush of all
+   * buffered deletes if enabled.
+   *
    * @see #setMaxBufferedDeleteTerms(int)
    */
   public int getMaxBufferedDeleteTerms() {
@@ -380,45 +380,50 @@ public final class IndexWriterConfig imp
    * and deletions before they are flushed to the Directory. Generally for
    * faster indexing performance it's best to flush by RAM usage instead of
    * document count and use as large a RAM buffer as you can.
-   * 
    * <p>
    * When this is set, the writer will flush whenever buffered documents and
    * deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
    * triggering a flush due to RAM usage. Note that if flushing by document
    * count is also enabled, then the flush will be triggered by whichever comes
    * first.
-   * 
+   * <p>
+   * The maximum RAM limit is inherently determined by the JVMs available memory.
+   * Yet, an {@link IndexWriter} session can consume a significantly larger amount
+   * of memory than the given RAM limit since this limit is just an indicator when
+   * to flush memory resident documents to the Directory. Flushes are likely happen
+   * concurrently while other threads adding documents to the writer. For application
+   * stability the available memory in the JVM should be significantly larger than
+   * the RAM buffer used for indexing.
    * <p>
    * <b>NOTE</b>: the account of RAM usage for pending deletions is only
    * approximate. Specifically, if you delete by Query, Lucene currently has no
    * way to measure the RAM usage of individual Queries so the accounting will
    * under-estimate and you should compensate by either calling commit()
    * periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
-   * to flush by count instead of RAM usage (each buffered delete Query counts 
-   * as one).
-   * 
+   * to flush and apply buffered deletes by count instead of RAM usage
+   * (for each buffered delete Query a constant number of bytes is used to estimate
+   * RAM usage). Note that enabling {@link #setMaxBufferedDeleteTerms(int)} will
+   * not trigger any segment flushes.
+   * <p>
+   * <b>NOTE</b>: It's not guaranteed that all memory resident documents are flushed 
+   * once this limit is exceeded. Depending on the configured {@link FlushPolicy} only a
+   * subset of the buffered documents are flushed and therefore only parts of the RAM
+   * buffer is released.    
    * <p>
-   * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
-   * internal storage, the absolute maximum value for this setting is somewhat
-   * less than 2048 MB. The precise limit depends on various factors, such as
-   * how large your documents are, how many fields have norms, etc., so it's
-   * best to set this value comfortably under 2048.
    * 
-   * <p>
    * The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
-   * 
+   * @see #setFlushPolicy(FlushPolicy)
+   * @see #setRAMPerThreadHardLimitMB(int)
+   *
    * <p>Takes effect immediately, but only the next time a
    * document is added, updated or deleted.
    *
    * @throws IllegalArgumentException
    *           if ramBufferSize is enabled but non-positive, or it disables
    *           ramBufferSize when maxBufferedDocs is already disabled
+   *           
    */
   public IndexWriterConfig setRAMBufferSizeMB(double ramBufferSizeMB) {
-    if (ramBufferSizeMB > 2048.0) {
-      throw new IllegalArgumentException("ramBufferSize " + ramBufferSizeMB
-          + " is too large; should be comfortably less than 2048");
-    }
     if (ramBufferSizeMB != DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0)
       throw new IllegalArgumentException(
           "ramBufferSize should be > 0.0 MB when enabled");
@@ -438,22 +443,22 @@ public final class IndexWriterConfig imp
    * Determines the minimal number of documents required before the buffered
    * in-memory documents are flushed as a new Segment. Large values generally
    * give faster indexing.
-   * 
+   *
    * <p>
    * When this is set, the writer will flush every maxBufferedDocs added
    * documents. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent triggering a
    * flush due to number of buffered documents. Note that if flushing by RAM
    * usage is also enabled, then the flush will be triggered by whichever comes
    * first.
-   * 
+   *
    * <p>
    * Disabled by default (writer flushes by RAM usage).
-   * 
+   *
    * <p>Takes effect immediately, but only the next time a
    * document is added, updated or deleted.
    *
    * @see #setRAMBufferSizeMB(double)
-   * 
+   * @see #setFlushPolicy(FlushPolicy)
    * @throws IllegalArgumentException
    *           if maxBufferedDocs is enabled but smaller than 2, or it disables
    *           maxBufferedDocs when ramBufferSize is already disabled
@@ -473,7 +478,7 @@ public final class IndexWriterConfig imp
   /**
    * Returns the number of buffered added documents that will trigger a flush if
    * enabled.
-   * 
+   *
    * @see #setMaxBufferedDocs(int)
    */
   public int getMaxBufferedDocs() {
@@ -519,32 +524,43 @@ public final class IndexWriterConfig imp
     return codecProvider;
   }
 
-  
+
   /**
    * Returns the current MergePolicy in use by this writer.
-   * 
+   *
    * @see #setMergePolicy(MergePolicy)
    */
   public MergePolicy getMergePolicy() {
     return mergePolicy;
   }
 
-  /**
-   * Sets the max number of simultaneous threads that may be indexing documents
-   * at once in IndexWriter. Values &lt; 1 are invalid and if passed
-   * <code>maxThreadStates</code> will be set to
-   * {@link #DEFAULT_MAX_THREAD_STATES}.
-   *
-   * <p>Only takes effect when IndexWriter is first created. */
-  public IndexWriterConfig setMaxThreadStates(int maxThreadStates) {
-    this.maxThreadStates = maxThreadStates < 1 ? DEFAULT_MAX_THREAD_STATES : maxThreadStates;
+  /** Expert: Sets the {@link DocumentsWriterPerThreadPool} instance used by the
+   * IndexWriter to assign thread-states to incoming indexing threads. If no
+   * {@link DocumentsWriterPerThreadPool} is set {@link IndexWriter} will use
+   * {@link ThreadAffinityDocumentsWriterThreadPool} with max number of
+   * thread-states set to {@value DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES} (see
+   * {@link DocumentsWriterPerThreadPool#DEFAULT_MAX_THREAD_STATES}).
+   * </p>
+   * <p>
+   * NOTE: The given {@link DocumentsWriterPerThreadPool} instance must not be used with
+   * other {@link IndexWriter} instances once it has been initialized / associated with an
+   * {@link IndexWriter}.
+   * </p>
+   * <p>
+   * NOTE: This only takes effect when IndexWriter is first created.</p>*/
+  public IndexWriterConfig setIndexerThreadPool(DocumentsWriterPerThreadPool threadPool) {
+    if(threadPool == null) {
+      throw new IllegalArgumentException("DocumentsWriterPerThreadPool must not be nul");
+    }
+    this.indexerThreadPool = threadPool;
     return this;
   }
 
-  /** Returns the max number of simultaneous threads that
-   *  may be indexing documents at once in IndexWriter. */
-  public int getMaxThreadStates() {
-    return maxThreadStates;
+  /** Returns the configured {@link DocumentsWriterPerThreadPool} instance.
+   * @see #setIndexerThreadPool(DocumentsWriterPerThreadPool)
+   * @return the configured {@link DocumentsWriterPerThreadPool} instance.*/
+  public DocumentsWriterPerThreadPool getIndexerThreadPool() {
+    return this.indexerThreadPool;
   }
 
   /** By default, IndexWriter does not pool the
@@ -572,10 +588,10 @@ public final class IndexWriterConfig imp
    *
    * <p>Only takes effect when IndexWriter is first created. */
   IndexWriterConfig setIndexingChain(IndexingChain indexingChain) {
-    this.indexingChain = indexingChain == null ? DocumentsWriter.defaultIndexingChain : indexingChain;
+    this.indexingChain = indexingChain == null ? DocumentsWriterPerThread.defaultIndexingChain : indexingChain;
     return this;
   }
-  
+
   /** Returns the indexing chain set on {@link #setIndexingChain(IndexingChain)}. */
   IndexingChain getIndexingChain() {
     return indexingChain;
@@ -604,6 +620,53 @@ public final class IndexWriterConfig imp
     return readerTermsIndexDivisor;
   }
   
+  /**
+   * Expert: Controls when segments are flushed to disk during indexing.
+   * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
+   * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
+   * @see #setMaxBufferedDeleteTerms(int)
+   * @see #setMaxBufferedDocs(int)
+   * @see #setRAMBufferSizeMB(double)
+   */
+  public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+    this.flushPolicy = flushPolicy;
+    return this;
+  }
+
+  /**
+   * Expert: Sets the maximum memory consumption per thread triggering a forced
+   * flush if exceeded. A {@link DocumentsWriterPerThread} is forcefully flushed
+   * once it exceeds this limit even if the {@link #getRAMBufferSizeMB()} has
+   * not been exceeded. This is a safety limit to prevent a
+   * {@link DocumentsWriterPerThread} from address space exhaustion due to its
+   * internal 32 bit signed integer based memory addressing.
+   * The given value must be less that 2GB (2048MB)
+   * 
+   * @see #DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB
+   */
+  public IndexWriterConfig setRAMPerThreadHardLimitMB(int perThreadHardLimitMB) {
+    if (perThreadHardLimitMB <= 0 || perThreadHardLimitMB >= 2048) {
+      throw new IllegalArgumentException("PerThreadHardLimit must be greater than 0 and less than 2048MB");
+    }
+    this.perThreadHardLimitMB = perThreadHardLimitMB;
+    return this;
+  }
+
+  /**
+   * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
+   * consume until forcefully flushed.
+   * @see #setRAMPerThreadHardLimitMB(int) 
+   */
+  public int getRAMPerThreadHardLimitMB() {
+    return perThreadHardLimitMB;
+  }
+  /**
+   * @see #setFlushPolicy(FlushPolicy)
+   */
+  public FlushPolicy getFlushPolicy() {
+    return flushPolicy;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -623,9 +686,13 @@ public final class IndexWriterConfig imp
     sb.append("mergedSegmentWarmer=").append(mergedSegmentWarmer).append("\n");
     sb.append("codecProvider=").append(codecProvider).append("\n");
     sb.append("mergePolicy=").append(mergePolicy).append("\n");
-    sb.append("maxThreadStates=").append(maxThreadStates).append("\n");
+    sb.append("indexerThreadPool=").append(indexerThreadPool).append("\n");
     sb.append("readerPooling=").append(readerPooling).append("\n");
     sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+    sb.append("flushPolicy=").append(flushPolicy).append("\n");
+    sb.append("perThreadHardLimitMB=").append(perThreadHardLimitMB).append("\n");
+
     return sb.toString();
   }
+
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IntBlockPool.java Sun May  1 22:38:33 2011
@@ -1,5 +1,7 @@
 package org.apache.lucene.index;
 
+import java.util.Arrays;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -22,24 +24,24 @@ final class IntBlockPool {
   public int[][] buffers = new int[10][];
 
   int bufferUpto = -1;                        // Which buffer we are upto
-  public int intUpto = DocumentsWriter.INT_BLOCK_SIZE;             // Where we are in head buffer
+  public int intUpto = DocumentsWriterPerThread.INT_BLOCK_SIZE;             // Where we are in head buffer
 
   public int[] buffer;                              // Current head buffer
-  public int intOffset = -DocumentsWriter.INT_BLOCK_SIZE;          // Current head offset
+  public int intOffset = -DocumentsWriterPerThread.INT_BLOCK_SIZE;          // Current head offset
 
-  final private DocumentsWriter docWriter;
+  final private DocumentsWriterPerThread docWriter;
 
-  public IntBlockPool(DocumentsWriter docWriter) {
+  public IntBlockPool(DocumentsWriterPerThread docWriter) {
     this.docWriter = docWriter;
   }
 
   public void reset() {
     if (bufferUpto != -1) {
-      if (bufferUpto > 0)
-        // Recycle all but the first buffer
-        docWriter.recycleIntBlocks(buffers, 1, 1+bufferUpto);
-
       // Reuse first buffer
+      if (bufferUpto > 0) {
+        docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+        Arrays.fill(buffers, 1, bufferUpto, null);
+      }
       bufferUpto = 0;
       intUpto = 0;
       intOffset = 0;
@@ -57,7 +59,7 @@ final class IntBlockPool {
     bufferUpto++;
 
     intUpto = 0;
-    intOffset += DocumentsWriter.INT_BLOCK_SIZE;
+    intOffset += DocumentsWriterPerThread.INT_BLOCK_SIZE;
   }
 }
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocConsumer.java Sun May  1 22:38:33 2011
@@ -17,20 +17,22 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import java.util.Collection;
-import java.util.Map;
 import java.io.IOException;
+import java.util.Map;
 
 abstract class InvertedDocConsumer {
 
-  /** Add a new thread */
-  abstract InvertedDocConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
-
   /** Abort (called after hitting AbortException) */
   abstract void abort();
 
   /** Flush a new segment */
-  abstract void flush(Map<InvertedDocConsumerPerThread,Collection<InvertedDocConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+  abstract void flush(Map<FieldInfo, InvertedDocConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
+
+  abstract InvertedDocConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+
+  abstract void startDocument() throws IOException;
+
+  abstract void finishDocument() throws IOException;
 
   /** Attempt to free RAM, returning true if any RAM was
    *  freed */

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/InvertedDocEndConsumer.java Sun May  1 22:38:33 2011
@@ -17,12 +17,13 @@ package org.apache.lucene.index;
  * limitations under the License.
  */
 
-import java.util.Collection;
-import java.util.Map;
 import java.io.IOException;
+import java.util.Map;
 
 abstract class InvertedDocEndConsumer {
-  abstract InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread);
-  abstract void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException;
+  abstract void flush(Map<FieldInfo, InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
   abstract void abort();
+  abstract InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField, FieldInfo fieldInfo);
+  abstract void startDocument() throws IOException;
+  abstract void finishDocument() throws IOException;
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriter.java Sun May  1 22:38:33 2011
@@ -19,11 +19,7 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.List;
-import java.util.ArrayList;
 
 import org.apache.lucene.store.IndexOutput;
 
@@ -36,10 +32,6 @@ import org.apache.lucene.store.IndexOutp
 
 final class NormsWriter extends InvertedDocEndConsumer {
 
-  @Override
-  public InvertedDocEndConsumerPerThread addThread(DocInverterPerThread docInverterPerThread) {
-    return new NormsWriterPerThread(docInverterPerThread, this);
-  }
 
   @Override
   public void abort() {}
@@ -50,40 +42,11 @@ final class NormsWriter extends Inverted
   /** Produce _X.nrm if any document had a field with norms
    *  not disabled */
   @Override
-  public void flush(Map<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> threadsAndFields, SegmentWriteState state) throws IOException {
-
-    final Map<FieldInfo,List<NormsWriterPerField>> byField = new HashMap<FieldInfo,List<NormsWriterPerField>>();
-
+  public void flush(Map<FieldInfo,InvertedDocEndConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
     if (!state.fieldInfos.hasNorms()) {
       return;
     }
 
-    // Typically, each thread will have encountered the same
-    // field.  So first we collate by field, ie, all
-    // per-thread field instances that correspond to the
-    // same FieldInfo
-    for (final Map.Entry<InvertedDocEndConsumerPerThread,Collection<InvertedDocEndConsumerPerField>> entry : threadsAndFields.entrySet()) {
-      final Collection<InvertedDocEndConsumerPerField> fields = entry.getValue();
-      final Iterator<InvertedDocEndConsumerPerField> fieldsIt = fields.iterator();
-
-      while (fieldsIt.hasNext()) {
-        final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next();
-
-        if (perField.upto > 0) {
-          // It has some norms
-          List<NormsWriterPerField> l = byField.get(perField.fieldInfo);
-          if (l == null) {
-            l = new ArrayList<NormsWriterPerField>();
-            byField.put(perField.fieldInfo, l);
-          }
-          l.add(perField);
-        } else
-          // Remove this field since we haven't seen it
-          // since the previous flush
-          fieldsIt.remove();
-      }
-    }
-
     final String normsFileName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.NORMS_EXTENSION);
     IndexOutput normsOut = state.directory.createOutput(normsFileName);
 
@@ -93,60 +56,25 @@ final class NormsWriter extends Inverted
       int normCount = 0;
 
       for (FieldInfo fi : state.fieldInfos) {
-        final List<NormsWriterPerField> toMerge = byField.get(fi);
+        final NormsWriterPerField toWrite = (NormsWriterPerField) fieldsToFlush.get(fi);
         int upto = 0;
-        if (toMerge != null) {
-
-          final int numFields = toMerge.size();
-
+        if (toWrite != null && toWrite.upto > 0) {
           normCount++;
 
-          final NormsWriterPerField[] fields = new NormsWriterPerField[numFields];
-          int[] uptos = new int[numFields];
-
-          for(int j=0;j<numFields;j++)
-            fields[j] = toMerge.get(j);
-
-          int numLeft = numFields;
-              
-          while(numLeft > 0) {
-
-            assert uptos[0] < fields[0].docIDs.length : " uptos[0]=" + uptos[0] + " len=" + (fields[0].docIDs.length);
-
-            int minLoc = 0;
-            int minDocID = fields[0].docIDs[uptos[0]];
-
-            for(int j=1;j<numLeft;j++) {
-              final int docID = fields[j].docIDs[uptos[j]];
-              if (docID < minDocID) {
-                minDocID = docID;
-                minLoc = j;
-              }
-            }
-
-            assert minDocID < state.numDocs;
-
-            // Fill hole
-            for(;upto<minDocID;upto++)
+          int docID = 0;
+          for (; docID < state.numDocs; docID++) {
+            if (upto < toWrite.upto && toWrite.docIDs[upto] == docID) {
+              normsOut.writeByte(toWrite.norms[upto]);
+              upto++;
+            } else {
               normsOut.writeByte((byte) 0);
-
-            normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]);
-            (uptos[minLoc])++;
-            upto++;
-
-            if (uptos[minLoc] == fields[minLoc].upto) {
-              fields[minLoc].reset();
-              if (minLoc != numLeft-1) {
-                fields[minLoc] = fields[numLeft-1];
-                uptos[minLoc] = uptos[numLeft-1];
-              }
-              numLeft--;
             }
           }
-          
-          // Fill final hole with defaultNorm
-          for(;upto<state.numDocs;upto++)
-            normsOut.writeByte((byte) 0);
+
+          // we should have consumed every norm
+          assert upto == toWrite.upto;
+
+          toWrite.reset();
         } else if (fi.isIndexed && !fi.omitNorms) {
           normCount++;
           // Fill entire field with default norm:
@@ -161,4 +89,16 @@ final class NormsWriter extends Inverted
       normsOut.close();
     }
   }
+
+  @Override
+  void finishDocument() throws IOException {}
+
+  @Override
+  void startDocument() throws IOException {}
+
+  @Override
+  InvertedDocEndConsumerPerField addField(DocInverterPerField docInverterPerField,
+      FieldInfo fieldInfo) {
+    return new NormsWriterPerField(docInverterPerField, fieldInfo);
+  }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/NormsWriterPerField.java Sun May  1 22:38:33 2011
@@ -27,9 +27,8 @@ import org.apache.lucene.util.ArrayUtil;
 
 final class NormsWriterPerField extends InvertedDocEndConsumerPerField implements Comparable<NormsWriterPerField> {
 
-  final NormsWriterPerThread perThread;
   final FieldInfo fieldInfo;
-  final DocumentsWriter.DocState docState;
+  final DocumentsWriterPerThread.DocState docState;
   final Similarity similarity;
   
   // Holds all docID/norm pairs we've seen
@@ -46,10 +45,9 @@ final class NormsWriterPerField extends 
     upto = 0;
   }
 
-  public NormsWriterPerField(final DocInverterPerField docInverterPerField, final NormsWriterPerThread perThread, final FieldInfo fieldInfo) {
-    this.perThread = perThread;
+  public NormsWriterPerField(final DocInverterPerField docInverterPerField, final FieldInfo fieldInfo) {
     this.fieldInfo = fieldInfo;
-    docState = perThread.docState;
+    docState = docInverterPerField.docState;
     fieldState = docInverterPerField.fieldState;
     similarity = docState.similarityProvider.get(fieldInfo.name);
   }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1098427&r1=1098426&r2=1098427&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Sun May  1 22:38:33 2011
@@ -37,14 +37,14 @@ import org.apache.lucene.util.Constants;
 /**
  * Information about a segment such as it's name, directory, and files related
  * to the segment.
- * 
+ *
  * @lucene.experimental
  */
 public final class SegmentInfo {
 
   static final int NO = -1;          // e.g. no norms; no deletes;
   static final int YES = 1;          // e.g. have norms; have deletes;
-  static final int WITHOUT_GEN = 0;  // a file name that has no GEN in it. 
+  static final int WITHOUT_GEN = 0;  // a file name that has no GEN in it.
 
   public String name;				  // unique name in dir
   public int docCount;				  // number of docs in seg
@@ -56,7 +56,7 @@ public final class SegmentInfo {
    * - YES or higher if there are deletes at generation N
    */
   private long delGen;
-  
+
   /*
    * Current generation of each field's norm file. If this array is null,
    * means no separate norms. If this array is not null, its values mean:
@@ -65,7 +65,7 @@ public final class SegmentInfo {
    */
   private Map<Integer,Long> normGen;
 
-  private boolean isCompoundFile;         
+  private boolean isCompoundFile;
 
   private volatile List<String> files;                     // cached list of files that this segment uses
                                                   // in the Directory
@@ -73,10 +73,13 @@ public final class SegmentInfo {
   private volatile long sizeInBytesNoStore = -1;           // total byte size of all but the store files (computed on demand)
   private volatile long sizeInBytesWithStore = -1;         // total byte size of all of our files (computed on demand)
 
+  //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
   private int docStoreOffset;                     // if this segment shares stored fields & vectors, this
                                                   // offset is where in that file this segment's docs begin
+  //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
   private String docStoreSegment;                 // name used to derive fields/vectors file we share with
                                                   // other segments
+  //TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
   private boolean docStoreIsCompoundFile;         // whether doc store files are stored in compound file (*.cfx)
 
   private int delCount;                           // How many deleted docs in this segment
@@ -91,9 +94,9 @@ public final class SegmentInfo {
 
   private Map<String,String> diagnostics;
 
-  // Tracks the Lucene version this segment was created with, since 3.1. Null 
+  // Tracks the Lucene version this segment was created with, since 3.1. Null
   // indicates an older than 3.0 index, and it's used to detect a too old index.
-  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and 
+  // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
   // specific versions afterwards ("3.0", "3.1" etc.).
   // see Constants.LUCENE_MAIN_VERSION.
   private String version;
@@ -101,7 +104,7 @@ public final class SegmentInfo {
   // NOTE: only used in-RAM by IW to track buffered deletes;
   // this is never written to/read from the Directory
   private long bufferedDeletesGen;
-  
+
   public SegmentInfo(String name, int docCount, Directory dir, boolean isCompoundFile,
                      boolean hasProx, SegmentCodecs segmentCodecs, boolean hasVectors, FieldInfos fieldInfos) {
     this.name = name;
@@ -182,11 +185,13 @@ public final class SegmentInfo {
       docStoreSegment = name;
       docStoreIsCompoundFile = false;
     }
+
     if (format > DefaultSegmentInfosWriter.FORMAT_4_0) {
       // pre-4.0 indexes write a byte if there is a single norms file
       byte b = input.readByte();
       assert 1 == b;
     }
+
     int numNormGen = input.readInt();
     if (numNormGen == NO) {
       normGen = null;
@@ -207,7 +212,7 @@ public final class SegmentInfo {
     assert delCount <= docCount;
 
     hasProx = input.readByte() == YES;
-    
+
     // System.out.println(Thread.currentThread().getName() + ": si.read hasProx=" + hasProx + " seg=" + name);
     if (format <= DefaultSegmentInfosWriter.FORMAT_4_0) {
       segmentCodecs = new SegmentCodecs(codecs, input);
@@ -217,7 +222,7 @@ public final class SegmentInfo {
       segmentCodecs = new SegmentCodecs(codecs, new Codec[] { codecs.lookup("PreFlex")});
     }
     diagnostics = input.readStringStringMap();
-    
+
     if (format <= DefaultSegmentInfosWriter.FORMAT_HAS_VECTORS) {
       hasVectors = input.readByte() == 1;
     } else {
@@ -366,7 +371,7 @@ public final class SegmentInfo {
       // against this segment
       return null;
     } else {
-      return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen); 
+      return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
     }
   }
 
@@ -432,7 +437,7 @@ public final class SegmentInfo {
     if (hasSeparateNorms(number)) {
       return IndexFileNames.fileNameFromGeneration(name, "s" + number, normGen.get(number));
     } else {
-      // single file for all norms 
+      // single file for all norms
       return IndexFileNames.fileNameFromGeneration(name, IndexFileNames.NORMS_EXTENSION, WITHOUT_GEN);
     }
   }
@@ -465,39 +470,74 @@ public final class SegmentInfo {
     assert delCount <= docCount;
   }
 
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
   public int getDocStoreOffset() {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
     return docStoreOffset;
   }
-  
+
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
   public boolean getDocStoreIsCompoundFile() {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
     return docStoreIsCompoundFile;
   }
-  
-  void setDocStoreIsCompoundFile(boolean v) {
-    docStoreIsCompoundFile = v;
+
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
+  public void setDocStoreIsCompoundFile(boolean docStoreIsCompoundFile) {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+    this.docStoreIsCompoundFile = docStoreIsCompoundFile;
     clearFilesCache();
   }
-  
+
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
+  void setDocStore(int offset, String segment, boolean isCompoundFile) {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+    docStoreOffset = offset;
+    docStoreSegment = segment;
+    docStoreIsCompoundFile = isCompoundFile;
+    clearFilesCache();
+  }
+
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
   public String getDocStoreSegment() {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
     return docStoreSegment;
   }
-  
-  public void setDocStoreSegment(String segment) {
-    docStoreSegment = segment;
-  }
-  
+
+  /**
+   * @deprecated shared doc stores are not supported in >= 4.0
+   */
+  @Deprecated
   void setDocStoreOffset(int offset) {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
     docStoreOffset = offset;
     clearFilesCache();
   }
 
-  void setDocStore(int offset, String segment, boolean isCompoundFile) {        
-    docStoreOffset = offset;
-    docStoreSegment = segment;
-    docStoreIsCompoundFile = isCompoundFile;
-    clearFilesCache();
+  /**
+   * @deprecated shared doc stores are not supported in 4.0
+   */
+  @Deprecated
+  public void setDocStoreSegment(String docStoreSegment) {
+    // TODO: LUCENE-2555: remove once we don't need to support shared doc stores (pre 4.0)
+    this.docStoreSegment = docStoreSegment;
   }
-  
+
   /** Save this segment's info. */
   public void write(IndexOutput output)
     throws IOException {
@@ -507,12 +547,14 @@ public final class SegmentInfo {
     output.writeString(name);
     output.writeInt(docCount);
     output.writeLong(delGen);
+
     output.writeInt(docStoreOffset);
     if (docStoreOffset != -1) {
       output.writeString(docStoreSegment);
       output.writeByte((byte) (docStoreIsCompoundFile ? 1:0));
     }
 
+
     if (normGen == null) {
       output.writeInt(NO);
     } else {
@@ -522,7 +564,7 @@ public final class SegmentInfo {
         output.writeLong(entry.getValue());
       }
     }
-    
+
     output.writeByte((byte) (isCompoundFile ? YES : NO));
     output.writeInt(delCount);
     output.writeByte((byte) (hasProx ? 1:0));
@@ -570,9 +612,9 @@ public final class SegmentInfo {
       // Already cached:
       return files;
     }
-    
+
     Set<String> fileSet = new HashSet<String>();
-    
+
     boolean useCompoundFile = getUseCompoundFile();
 
     if (useCompoundFile) {
@@ -606,7 +648,7 @@ public final class SegmentInfo {
         fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_INDEX_EXTENSION));
         fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_DOCUMENTS_EXTENSION));
         fileSet.add(IndexFileNames.segmentFileName(name, "", IndexFileNames.VECTORS_FIELDS_EXTENSION));
-      }      
+      }
     }
 
     String delFileName = IndexFileNames.fileNameFromGeneration(name, IndexFileNames.DELETES_EXTENSION, delGen);
@@ -644,7 +686,7 @@ public final class SegmentInfo {
   }
 
   /** Used for debugging.  Format may suddenly change.
-   * 
+   *
    *  <p>Current format looks like
    *  <code>_a(3.1):c45/4->_1</code>, which means the segment's
    *  name is <code>_a</code>; it was created with Lucene 3.1 (or
@@ -674,7 +716,7 @@ public final class SegmentInfo {
     if (delCount != 0) {
       s.append('/').append(delCount);
     }
-    
+
     if (docStoreOffset != -1) {
       s.append("->").append(docStoreSegment);
       if (docStoreIsCompoundFile) {
@@ -714,13 +756,13 @@ public final class SegmentInfo {
    * <b>NOTE:</b> this method is used for internal purposes only - you should
    * not modify the version of a SegmentInfo, or it may result in unexpected
    * exceptions thrown when you attempt to open the index.
-   * 
+   *
    * @lucene.internal
    */
   public void setVersion(String version) {
     this.version = version;
   }
-  
+
   /** Returns the version of the code which wrote the segment. */
   public String getVersion() {
     return version;