You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/03/30 15:30:07 UTC

svn commit: r1086947 [1/2] - in /lucene/dev/branches/realtime_search/lucene/src: java/org/apache/lucene/index/ test-framework/org/apache/lucene/store/ test-framework/org/apache/lucene/util/ test/org/apache/lucene/index/

Author: simonw
Date: Wed Mar 30 13:30:07 2011
New Revision: 1086947

URL: http://svn.apache.org/viewvc?rev=1086947&view=rev
Log:
LUCENE-2573: Tiered flushing of DWPTs by RAM with low/high water marks

Added:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
    lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/util/ThrottledIndexOutput.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Modified:
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
    lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
    lucene/dev/branches/realtime_search/lucene/src/test-framework/org/apache/lucene/store/MockDirectoryWrapper.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
    lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Wed Mar 30 13:30:07 2011
@@ -72,13 +72,18 @@ class BufferedDeletes {
 
   public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
 
-  final AtomicLong bytesUsed = new AtomicLong();
+  final AtomicLong bytesUsed;
 
   private final static boolean VERBOSE_DELETES = false;
 
   long gen;
-
   public BufferedDeletes(boolean sortTerms) {
+    this(sortTerms, new AtomicLong());
+  }
+
+  BufferedDeletes(boolean sortTerms, AtomicLong bytesUsed) {
+    assert bytesUsed != null;
+    this.bytesUsed = bytesUsed;
     if (sortTerms) {
       terms = new TreeMap<Term,Integer>();
     } else {

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Wed Mar 30 13:30:07 2011
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.document.Document;
@@ -104,10 +103,8 @@ import org.apache.lucene.store.Directory
  */
 
 final class DocumentsWriter {
-  final AtomicLong bytesUsed = new AtomicLong(0);
   Directory directory;
 
-  boolean bufferIsFull;                   // True when it's time to write segment
   private volatile boolean closed;
 
   PrintStream infoStream;
@@ -118,25 +115,36 @@ final class DocumentsWriter {
   final IndexWriter indexWriter;
 
   private AtomicInteger numDocsInRAM = new AtomicInteger(0);
-  private AtomicLong ramUsed = new AtomicLong(0);
 
   final BufferedDeletesStream bufferedDeletesStream;
   // TODO: cutover to BytesRefHash
-  private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
+  private final BufferedDeletes pendingDeletes = new BufferedDeletes(false);
   final IndexingChain chain;
-  private final IndexWriterConfig config;
 
   final DocumentsWriterPerThreadPool perThreadPool;
+  final FlushPolicy flushPolicy;
+  final DocumentsWriterFlushControl flushControl;
+  final Healthiness healthiness;
   DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
       BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
     this.indexWriter = writer;
-    this.similarityProvider = writer.getConfig().getSimilarityProvider();
+    this.similarityProvider = config.getSimilarityProvider();
     this.bufferedDeletesStream = bufferedDeletesStream;
     this.perThreadPool = config.getIndexerThreadPool();
     this.chain = config.getIndexingChain();
     this.perThreadPool.initialize(this, globalFieldNumbers, config);
-    this.config = config;
+    final FlushPolicy configuredPolicy = config.getFlushPolicy();
+    if (configuredPolicy == null) {
+      flushPolicy = new FlushByRamOrCountsPolicy();
+    } else {
+      flushPolicy = configuredPolicy;
+    }
+    flushPolicy.init(this);
+    
+    healthiness = new Healthiness();
+    final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
+    flushControl = new DocumentsWriterFlushControl(flushPolicy, perThreadPool, healthiness, pendingDeletes, maxRamPerDWPT);
   }
 
   boolean deleteQueries(final Query... queries) throws IOException {
@@ -146,13 +154,15 @@ final class DocumentsWriter {
       }
     }
 
-    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
     while (threadsIterator.hasNext()) {
       ThreadState state = threadsIterator.next();
       state.lock();
       try {
-        state.perThread.deleteQueries(queries);
+        if (state.isActive()) {
+          state.perThread.deleteQueries(queries); 
+        }
       } finally {
         state.unlock();
       }
@@ -178,12 +188,17 @@ final class DocumentsWriter {
       ThreadState state = threadsIterator.next();
       state.lock();
       try {
-        state.perThread.deleteTerms(terms);
+        if (state.isActive()) {
+          state.perThread.deleteTerms(terms);
+          flushControl.doOnDelete(state);
+        }
       } finally {
         state.unlock();
       }
     }
-
+    if (flushControl.flushDeletes.getAndSet(false)) {
+      flushDeletes();
+    }
     return false;
   }
 
@@ -194,7 +209,7 @@ final class DocumentsWriter {
     return deleteTerms(term);
   }
 
-  void deleteTerm(final Term term, ThreadState exclude) {
+  void deleteTerm(final Term term, ThreadState exclude) throws IOException {
     synchronized(this) {
       pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
     }
@@ -207,11 +222,21 @@ final class DocumentsWriter {
         state.lock();
         try {
           state.perThread.deleteTerms(term);
+          flushControl.doOnDelete(state);
         } finally {
           state.unlock();
         }
       }
     }
+    if (flushControl.flushDeletes.getAndSet(false)) {
+      flushDeletes();
+    }
+  }
+
+  private void flushDeletes() throws IOException {
+    maybePushPendingDeletes();
+    indexWriter.applyAllDeletes();
+    indexWriter.flushCount.incrementAndGet();
   }
 
   /** If non-null, various details of indexing are printed
@@ -221,11 +246,6 @@ final class DocumentsWriter {
     pushConfigChange();
   }
 
-  synchronized void setSimilarityProvider(SimilarityProvider similarityProvider) {
-    this.similarityProvider = similarityProvider;
-    pushConfigChange();
-  }
-
   private final void pushConfigChange() {
     Iterator<ThreadState> it = perThreadPool.getAllPerThreadsIterator();
     while (it.hasNext()) {
@@ -245,9 +265,11 @@ final class DocumentsWriter {
     return abortedFiles;
   }
 
-  void message(String message) {
+  // returns boolean for asserts
+  boolean message(String message) {
     if (infoStream != null)
       indexWriter.message("DW: " + message);
+    return true;
   }
 
   private void ensureOpen() throws AlreadyClosedException {
@@ -272,13 +294,18 @@ final class DocumentsWriter {
         message("docWriter: abort");
       }
 
-      Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+      final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
 
       while (threadsIterator.hasNext()) {
         ThreadState perThread = threadsIterator.next();
         perThread.lock();
         try {
-          perThread.perThread.abort();
+          if (perThread.isActive()) { // we might be closed
+            perThread.perThread.abort();
+            perThread.perThread.checkAndResetHasAborted();
+          } else {
+            assert closed;
+          }
         } finally {
           perThread.unlock();
         }
@@ -297,26 +324,12 @@ final class DocumentsWriter {
   }
 
   public int getBufferedDeleteTermsSize() {
-    int size = 0;
-    Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
-    while (it.hasNext()) {
-      DocumentsWriterPerThread dwpt = it.next().perThread;
-      size += dwpt.pendingDeletes.terms.size();
-    }
-    size += pendingDeletes.terms.size();
-    return size;
+    return pendingDeletes.terms.size();
   }
 
   //for testing
   public int getNumBufferedDeleteTerms() {
-    int numDeletes = 0;
-    Iterator<ThreadState> it = perThreadPool.getActivePerThreadsIterator();
-    while (it.hasNext()) {
-      DocumentsWriterPerThread dwpt = it.next().perThread;
-      numDeletes += dwpt.pendingDeletes.numTermDeletes.get();
-    }
-    numDeletes += pendingDeletes.numTermDeletes.get();
-    return numDeletes;
+    return pendingDeletes.numTermDeletes.get();
   }
 
   public boolean anyDeletions() {
@@ -325,67 +338,89 @@ final class DocumentsWriter {
 
   void close() {
     closed = true;
+    flushControl.setClosed();
   }
 
-  boolean updateDocument(final Document doc, final Analyzer analyzer, final Term delTerm)
-      throws CorruptIndexException, IOException {
+  boolean updateDocument(final Document doc, final Analyzer analyzer,
+      final Term delTerm) throws CorruptIndexException, IOException {
     ensureOpen();
-
-    FlushedSegment newSegment = null;
-
-    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
+    boolean maybeMerge = false;
+    final boolean isUpdate = delTerm != null;
+    if (healthiness.isStalled()) {
+      /*
+       * if we are allowed to hijack threads for flushing we try to flush out 
+       * as many pending DWPT to release memory and get back healthy status.
+       */
+      if (infoStream != null) {
+        message("WARNING DocumentsWriter is stalled try to hijack thread to flush pending segment");
+      }
+      // try pick up pending threads here if possile
+      final DocumentsWriterPerThread flushingDWPT;
+      flushingDWPT = flushControl.getFlushIfPending(null);
+       // don't push the delete here since the update could fail!
+      maybeMerge = doFlush(flushingDWPT);
+      if (infoStream != null && healthiness.isStalled()) {
+        message("WARNING DocumentsWriter is stalled might block thread until DocumentsWriter is not stalled anymore");
+      }
+      healthiness.waitIfStalled(); // block if stalled
+    }
+    ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(),
+        this, doc);
+    DocumentsWriterPerThread flushingDWPT = null;
     try {
-      DocumentsWriterPerThread dwpt = perThread.perThread;
-      long perThreadRAMUsedBeforeAdd = dwpt.bytesUsed();
-      dwpt.updateDocument(doc, analyzer, delTerm);
+      if (!perThread.isActive()) {
+        ensureOpen();
+        assert false: "perThread is not active but we are still open";
+      }
+      final DocumentsWriterPerThread dwpt = perThread.perThread;
+      try {
+        dwpt.updateDocument(doc, analyzer, delTerm);
+      } finally {
+        if(dwpt.checkAndResetHasAborted()) {
+            flushControl.doOnAbort(perThread);
+        }
+      }
+      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
       numDocsInRAM.incrementAndGet();
-
-      newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
     } finally {
       perThread.unlock();
     }
-
     // delete term from other DWPTs later, so that this thread
     // doesn't have to lock multiple DWPTs at the same time
-    if (delTerm != null) {
+    if (isUpdate) {
       deleteTerm(delTerm, perThread);
     }
+    maybeMerge |= doFlush(flushingDWPT);
+    return maybeMerge;
+  }
+  
+ 
 
-    if (newSegment != null) {
-      finishFlushedSegment(newSegment);
-    }
-
-    if (newSegment != null) {
-      perThreadPool.clearThreadBindings(perThread);
-      return true;
-    }
-
-    return false;
+  private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException {
+    boolean maybeMerge = false;
+    while (flushingDWPT != null) {
+      maybeMerge = true;
+      try {
+        // flush concurrently without locking
+        final FlushedSegment newSegment = flushingDWPT.flush();
+        finishFlushedSegment(newSegment);
+      } finally {
+          flushControl.doAfterFlush(flushingDWPT);
+          flushingDWPT.checkAndResetHasAborted();
+          indexWriter.flushCount.incrementAndGet();
+      }
+        flushingDWPT =  flushControl.nextPendingFlush() ;
     }
+    return maybeMerge;
+  }
+  
 
-  private void finishFlushedSegment(FlushedSegment newSegment) throws IOException {
+  private void finishFlushedSegment(FlushedSegment newSegment)
+      throws IOException {
     pushDeletes(newSegment);
     if (newSegment != null) {
       indexWriter.addFlushedSegment(newSegment);
-  }
-  }
-
-  private final FlushedSegment finishAddDocument(DocumentsWriterPerThread perThread,
-      long perThreadRAMUsedBeforeAdd) throws IOException {
-    FlushedSegment newSegment = null;
-    final int maxBufferedDocs = config.getMaxBufferedDocs();
-    if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-        perThread.getNumDocsInRAM() >= maxBufferedDocs) {
-      newSegment = perThread.flush();
-    }
-
-    long deltaRAM = perThread.bytesUsed() - perThreadRAMUsedBeforeAdd;
-    long oldValue = ramUsed.get();
-    while (!ramUsed.compareAndSet(oldValue, oldValue + deltaRAM)) {
-      oldValue = ramUsed.get();
     }
-
-    return newSegment;
   }
 
   final void subtractFlushedNumDocs(int numFlushed) {
@@ -402,66 +437,79 @@ final class DocumentsWriter {
       final long delGen = bufferedDeletesStream.getNextGen();
       // Lock order: DW -> BD
       if (deletes != null && deletes.any()) {
-        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes, delGen);
-          if (infoStream != null) {
-            message("flush: push buffered deletes");
-          }
-          bufferedDeletesStream.push(packet);
-          if (infoStream != null) {
-            message("flush: delGen=" + packet.gen);
-          }
-          }
-      flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
-          }
+        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(deletes,
+            delGen);
+        if (infoStream != null) {
+          message("flush: push buffered deletes");
+        }
+        bufferedDeletesStream.push(packet);
+        if (infoStream != null) {
+          message("flush: delGen=" + packet.gen);
         }
+      }
+      flushedSegment.segmentInfo.setBufferedDeletesGen(delGen);
+    }
+  }
 
   private synchronized final void maybePushPendingDeletes() {
     final long delGen = bufferedDeletesStream.getNextGen();
     if (pendingDeletes.any()) {
-      bufferedDeletesStream.push(new FrozenBufferedDeletes(pendingDeletes, delGen));
+      indexWriter.bufferedDeletesStream.push(new FrozenBufferedDeletes(
+          pendingDeletes, delGen));
       pendingDeletes.clear();
-      }
     }
+  }
 
   final boolean flushAllThreads(final boolean flushDeletes)
     throws IOException {
 
-    Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
+    final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
     boolean anythingFlushed = false;
 
     while (threadsIterator.hasNext()) {
-      FlushedSegment newSegment = null;
-
-      ThreadState perThread = threadsIterator.next();
-      perThread.lock();
+      final ThreadState perThread = threadsIterator.next();
+      final DocumentsWriterPerThread flushingDWPT;
+      /*
+       * TODO: maybe we can leverage incoming / indexing threads here if we mark
+       * all active threads pending so that we don't need to block until we got
+       * the handle. Yet, we need to figure out how to identify that a certain
+       * DWPT has been flushed since they are simply replaced once checked out
+       * for flushing. This would give us another level of concurrency during
+       * commit.
+       * 
+       * Maybe we simply iterate them and store the ThreadStates and mark
+       * all as flushPending and at the same time record the DWPT instance as a
+       * key for the pending ThreadState. This way we can easily iterate until
+       * all DWPT have changed.
+       */
+      perThread.lock(); 
       try {
-
-        DocumentsWriterPerThread dwpt = perThread.perThread;
-        final int numDocs = dwpt.getNumDocsInRAM();
-
+        if (!perThread.isActive()) {
+          assert closed;
+          continue; //this perThread is already done maybe by a concurrently indexing thread
+        }
+        final DocumentsWriterPerThread dwpt = perThread.perThread; 
         // Always flush docs if there are any
-        boolean flushDocs = numDocs > 0;
-
-        String segment = dwpt.getSegment();
-
+        final boolean flushDocs =  dwpt.getNumDocsInRAM() > 0;
+        final String segment = dwpt.getSegment();
         // If we are flushing docs, segment must not be null:
         assert segment != null || !flushDocs;
-
         if (flushDocs) {
-          newSegment = dwpt.flush();
-
-          if (newSegment != null) {
-            perThreadPool.clearThreadBindings(perThread);
-            }
+          // check out and set pending if not already set
+          flushingDWPT = flushControl.tryCheckoutForFlush(perThread, true);
+          assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+          assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+          try {
+            final FlushedSegment newSegment = dwpt.flush();
+            anythingFlushed = true;
+            finishFlushedSegment(newSegment);
+          } finally {
+            flushControl.doAfterFlush(flushingDWPT);
           }
+        }
       } finally {
         perThread.unlock();
       }
-
-      if (newSegment != null) {
-        anythingFlushed = true;
-        finishFlushedSegment(newSegment);
-      }
     }
 
     if (!anythingFlushed && flushDeletes) {
@@ -471,6 +519,10 @@ final class DocumentsWriter {
 
     return anythingFlushed;
   }
+  
+  
+  
+ 
 
 //  /* We have three pools of RAM: Postings, byte blocks
 //   * (holds freq/prox posting data) and per-doc buffers

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,263 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * This class controls {@link DocumentsWriterPerThread} flushing during
+ * indexing. It tracks the memory consumption per
+ * {@link DocumentsWriterPerThread} and uses a configured {@link FlushPolicy} to
+ * decide if a {@link DocumentsWriterPerThread} must flush.
+ * <p>
+ * In addition to the {@link FlushPolicy} the flush control might set certain
+ * {@link DocumentsWriterPerThread} as flush pending iff a
+ * {@link DocumentsWriterPerThread} exceeds the
+ * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
+ * space exhaustion.
+ */
+public final class DocumentsWriterFlushControl {
+
+  private final long maxBytesPerDWPT;
+  private long activeBytes = 0;
+  private long flushBytes = 0;
+  private volatile int numPending = 0;
+  private volatile int numFlushing = 0;
+  final AtomicBoolean flushDeletes = new AtomicBoolean(false);
+
+  long peakActiveBytes = 0;// only with assert
+  long peakFlushBytes = 0;// only with assert
+  long peakNetBytes = 0;// only with assert
+  private final Healthiness healthiness;
+  private final DocumentsWriterPerThreadPool perThreadPool;
+  private final FlushPolicy flushPolicy;
+  private boolean closed = false;
+  private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
+  private final BufferedDeletes pendingDeletes;
+
+  DocumentsWriterFlushControl(FlushPolicy flushPolicy,
+      DocumentsWriterPerThreadPool threadPool, Healthiness healthiness,
+      BufferedDeletes pendingDeletes, long maxBytesPerDWPT) {
+    this.healthiness = healthiness;
+    this.perThreadPool = threadPool;
+    this.flushPolicy = flushPolicy;
+    this.maxBytesPerDWPT = maxBytesPerDWPT;
+    this.pendingDeletes = pendingDeletes;
+  }
+
+  public synchronized long activeBytes() {
+    return activeBytes;
+  }
+
+  public synchronized long flushBytes() {
+    return flushBytes;
+  }
+
+  public synchronized long netBytes() {
+    return flushBytes + activeBytes;
+  }
+
+  private void commitPerThreadBytes(ThreadState perThread) {
+    final long delta = perThread.perThread.bytesUsed()
+        - perThread.perThreadBytes;
+    perThread.perThreadBytes += delta;
+    /*
+     * We need to differentiate here if we are pending since setFlushPending
+     * moves the perThread memory to the flushBytes and we could be set to
+     * pending during a delete
+     */
+    if (perThread.flushPending) {
+      flushBytes += delta;
+    } else {
+      activeBytes += delta;
+    }
+    assert updatePeaks(delta);
+  }
+
+  private boolean updatePeaks(long delta) {
+    peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
+    peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
+    peakNetBytes = Math.max(peakNetBytes, netBytes());
+    return true;
+  }
+
+  synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
+      boolean isUpdate) {
+    commitPerThreadBytes(perThread);
+    if (!perThread.flushPending) {
+      if (isUpdate) {
+        flushPolicy.onUpdate(this, perThread);
+      } else {
+        flushPolicy.onInsert(this, perThread);
+      }
+      if (!perThread.flushPending && perThread.perThreadBytes > maxBytesPerDWPT) {
+        // safety check to prevent a single DWPT exceeding its RAM limit. This
+        // is super
+        // important since we can not address more than 2048 MB per DWPT
+        setFlushPending(perThread);
+      }
+    }
+    final DocumentsWriterPerThread flushingDWPT = getFlushIfPending(perThread);
+    healthiness.updateStalled(this);
+    return flushingDWPT;
+  }
+
+  synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
+    assert flushingWriters.containsKey(dwpt);
+    numFlushing--;
+    Long bytes = flushingWriters.remove(dwpt);
+    flushBytes -= bytes.longValue();
+    perThreadPool.recycle(dwpt);
+    healthiness.updateStalled(this);
+  }
+
+  /**
+   * Sets flush pending state on the given {@link ThreadState}. The
+   * {@link ThreadState} must have indexed at least on Document and must not be
+   * already pending.
+   */
+  public synchronized void setFlushPending(ThreadState perThread) {
+    assert !perThread.flushPending;
+    assert perThread.perThread.getNumDocsInRAM() > 0;
+    perThread.flushPending = true; // write access synced
+    final long bytes = perThread.perThreadBytes;
+    flushBytes += bytes;
+    activeBytes -= bytes;
+    numPending++; // write access synced
+  }
+
+  synchronized void doOnAbort(ThreadState state) {
+    if (state.flushPending) {
+      flushBytes -= state.perThreadBytes;
+    } else {
+      activeBytes -= state.perThreadBytes;
+    }
+    // take it out of the loop this DWPT is stale
+    perThreadPool.replaceForFlush(state, closed);
+    healthiness.updateStalled(this);
+  }
+
+  synchronized DocumentsWriterPerThread tryCheckoutForFlush(
+      ThreadState perThread, boolean setPending) {
+    if (setPending && !perThread.flushPending) {
+      setFlushPending(perThread);
+    }
+    if (perThread.flushPending) {
+      // we are pending so all memory is already moved to flushBytes
+      if (perThread.tryLock()) {
+        try {
+          if (perThread.isActive()) {
+            assert perThread.isHeldByCurrentThread();
+            final DocumentsWriterPerThread dwpt;
+            final long bytes = perThread.perThreadBytes; // do that before
+                                                         // replace!
+            dwpt = perThreadPool.replaceForFlush(perThread, closed);
+            assert !flushingWriters.containsKey(dwpt) : "DWPT is already flushing";
+            // record the flushing DWPT to reduce flushBytes in doAfterFlush
+            flushingWriters.put(dwpt, Long.valueOf(bytes));
+            numPending--; // write access synced
+            numFlushing++;
+            return dwpt;
+          }
+        } finally {
+          perThread.unlock();
+        }
+      }
+    }
+    return null;
+  }
+
+  DocumentsWriterPerThread getFlushIfPending(ThreadState perThread) {
+    if (numPending > 0) {
+      final DocumentsWriterPerThread dwpt = perThread == null ? null
+          : tryCheckoutForFlush(perThread, false);
+      if (dwpt == null) {
+        return nextPendingFlush();
+      }
+      return dwpt;
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "DocumentsWriterFlushControl [activeBytes=" + activeBytes
+        + ", flushBytes=" + flushBytes + "]";
+  }
+
+  DocumentsWriterPerThread nextPendingFlush() {
+    if (numPending > 0) {
+      final Iterator<ThreadState> allActiveThreads = perThreadPool
+          .getActivePerThreadsIterator();
+      while (allActiveThreads.hasNext() && numPending > 0) {
+        ThreadState next = allActiveThreads.next();
+        if (next.flushPending) {
+          DocumentsWriterPerThread dwpt = tryCheckoutForFlush(next, false);
+          if (dwpt != null) {
+            return dwpt;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  synchronized void setClosed() {
+    // set by DW to signal that we should not release new DWPT after close
+    this.closed = true;
+  }
+
+  /**
+   * Returns an iterator that provides access to all currently active {@link ThreadState}s 
+   */
+  public Iterator<ThreadState> allActiveThreads() {
+    return perThreadPool.getActivePerThreadsIterator();
+  }
+
+  long maxNetBytes() {
+    return flushPolicy.getMaxNetBytes();
+  }
+
+  synchronized void doOnDelete(ThreadState state) {
+    if (!state.flushPending) {
+      flushPolicy.onDelete(this, state);
+    }
+  }
+
+  /**
+   * Returns the number of delete terms in the global pool
+   */
+  public int getNumGlobalTermDeletes() {
+    return pendingDeletes.numTermDeletes.get();
+  }
+
+  int numFlushingDWPT() {
+    return numFlushing;
+  }
+  
+  public void setFlushDeletes() {	
+	  flushDeletes.set(true);
+  }
+  
+  int numActiveDWPT() {
+    return this.perThreadPool.getMaxThreadStates();
+  }
+}
\ No newline at end of file

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Wed Mar 30 13:30:07 2011
@@ -31,7 +31,7 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.SimilarityProvider;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
+import org.apache.lucene.util.ByteBlockPool.Allocator;
 import org.apache.lucene.util.RamUsageEstimator;
 
 public class DocumentsWriterPerThread {
@@ -73,17 +73,14 @@ public class DocumentsWriterPerThread {
       final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriterPerThread);
       final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();
 
-      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter,
-                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, null));
+      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriterPerThread, freqProxWriter, true,
+                                                           new TermsHash(documentsWriterPerThread, termVectorsWriter, false, null));
       final NormsWriter normsWriter = new NormsWriter();
       final DocInverter docInverter = new DocInverter(documentsWriterPerThread.docState, termsHash, normsWriter);
       return new DocFieldProcessor(documentsWriterPerThread, docInverter);
     }
   };
 
-  // Deletes for our still-in-RAM (to be flushed next) segment
-  BufferedDeletes pendingDeletes = new BufferedDeletes(false);
-
   static class DocState {
     final DocumentsWriterPerThread docWriter;
     Analyzer analyzer;
@@ -128,7 +125,7 @@ public class DocumentsWriterPerThread {
    *  currently buffered docs.  This resets our state,
    *  discarding any docs added since last flush. */
   void abort() throws IOException {
-    aborting = true;
+    hasAborted = aborting = true;
     try {
       if (infoStream != null) {
         message("docWriter: now abort");
@@ -152,38 +149,53 @@ public class DocumentsWriterPerThread {
 
   final DocumentsWriter parent;
   final IndexWriter writer;
-
   final Directory directory;
   final DocState docState;
   final DocConsumer consumer;
+  final AtomicLong bytesUsed;
+  
+  SegmentWriteState flushState;
+  //Deletes for our still-in-RAM (to be flushed next) segment
+  BufferedDeletes pendingDeletes;  
+  String segment;     // Current segment we are working on
+  boolean aborting = false;   // True if an abort is pending
+  boolean hasAborted = false; // True if the last exception throws by #updateDocument was aborting
 
-  String segment;                         // Current segment we are working on
-  boolean aborting;               // True if an abort is pending
-
+  private FieldInfos fieldInfos;
   private final PrintStream infoStream;
   private int numDocsInRAM;
   private int flushedDocCount;
-  SegmentWriteState flushState;
-
-  final AtomicLong bytesUsed = new AtomicLong(0);
-
-  private FieldInfos fieldInfos;
-
-  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, FieldInfos fieldInfos, IndexingChain indexingChain) {
+  
+  public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
+      FieldInfos fieldInfos, IndexingChain indexingChain) {
     this.directory = directory;
     this.parent = parent;
     this.fieldInfos = fieldInfos;
     this.writer = parent.indexWriter;
     this.infoStream = parent.indexWriter.getInfoStream();
     this.docState = new DocState(this);
-    this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
+    this.docState.similarityProvider = parent.indexWriter.getConfig()
+        .getSimilarityProvider();
 
     consumer = indexingChain.getChain(this);
-    }
+    bytesUsed = new AtomicLong(0);
+    pendingDeletes = new BufferedDeletes(false);
+  }
+  
+  public DocumentsWriterPerThread(DocumentsWriterPerThread other, FieldInfos fieldInfos) {
+    this(other.directory, other.parent, fieldInfos, other.parent.chain);
+    
+  }
 
   void setAborting() {
     aborting = true;
   }
+  
+  boolean checkAndResetHasAborted() {
+    final boolean retval = hasAborted;
+    hasAborted = false;
+    return retval;
+  }
 
   public void updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws IOException {
     assert writer.testPoint("DocumentsWriterPerThread addDocument start");
@@ -203,7 +215,7 @@ public class DocumentsWriterPerThread {
     boolean success = false;
     try {
       try {
-      consumer.processDocument(fieldInfos);
+        consumer.processDocument(fieldInfos);
       } finally {
         docState.clear();
       }
@@ -251,21 +263,33 @@ public class DocumentsWriterPerThread {
 
   void deleteQueries(Query... queries) {
     if (numDocsInRAM > 0) {
-    for (Query query : queries) {
-      pendingDeletes.addQuery(query, numDocsInRAM);
+      for (Query query : queries) {
+        pendingDeletes.addQuery(query, numDocsInRAM);
+      }
     }
   }
-  }
 
   void deleteTerms(Term... terms) {
     if (numDocsInRAM > 0) {
-    for (Term term : terms) {
-      pendingDeletes.addTerm(term, numDocsInRAM);
+      for (Term term : terms) {
+        pendingDeletes.addTerm(term, numDocsInRAM);
+      }
     }
   }
+  
+  /**
+   * Returns the number of delete terms in this {@link DocumentsWriterPerThread}
+   */
+  public int numDeleteTerms() {
+    // public for FlushPolicy
+    return pendingDeletes.numTermDeletes.get();
   }
 
-  int getNumDocsInRAM() {
+  /**
+   * Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread}
+   */
+  public int getNumDocsInRAM() {
+    // public for FlushPolicy
     return numDocsInRAM;
   }
 
@@ -285,7 +309,6 @@ public class DocumentsWriterPerThread {
   /** Flush all pending docs to a new segment */
   FlushedSegment flush() throws IOException {
     assert numDocsInRAM > 0;
-
     flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
         numDocsInRAM, writer.getConfig().getTermIndexInterval(),
         fieldInfos.buildSegmentCodecs(true), pendingDeletes);
@@ -323,16 +346,17 @@ public class DocumentsWriterPerThread {
       newSegment.setHasVectors(flushState.hasVectors);
 
       if (infoStream != null) {
-        message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
+        message("new segment has " + (flushState.deletedDocs == null ? 0 : flushState.deletedDocs.count()) + " deleted docs");
         message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
         message("flushedFiles=" + newSegment.files());
         message("flushed codecs=" + newSegment.getSegmentCodecs());
       }
       flushedDocCount += flushState.numDocs;
 
-      BufferedDeletes segmentDeletes = null;
+      final BufferedDeletes segmentDeletes;
       if (pendingDeletes.queries.isEmpty()) {
         pendingDeletes.clear();
+        segmentDeletes = null;
       } else {
         segmentDeletes = pendingDeletes;
         pendingDeletes = new BufferedDeletes(false);
@@ -350,7 +374,6 @@ public class DocumentsWriterPerThread {
             parent.indexWriter.deleter.refresh(segment);
           }
         }
-
         abort();
       }
     }
@@ -362,7 +385,7 @@ public class DocumentsWriterPerThread {
   }
 
   long bytesUsed() {
-    return bytesUsed.get();
+    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
   }
 
   FieldInfos getFieldInfos() {
@@ -395,11 +418,38 @@ public class DocumentsWriterPerThread {
     bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
     return b;
   }
+  
+  void recycleIntBlocks(int[][] blocks, int offset, int length) {
+    bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
+  }
 
-  final DirectAllocator byteBlockAllocator = new DirectAllocator();
+  final Allocator byteBlockAllocator = new DirectTrackingAllocator();
+    
+    
+ private class DirectTrackingAllocator extends Allocator {
+    public DirectTrackingAllocator() {
+      this(BYTE_BLOCK_SIZE);
+    }
+
+    public DirectTrackingAllocator(int blockSize) {
+      super(blockSize);
+    }
+
+    public byte[] getByteBlock() {
+      bytesUsed.addAndGet(blockSize);
+      return new byte[blockSize];
+    }
+    @Override
+    public void recycleByteBlocks(byte[][] blocks, int start, int end) {
+      bytesUsed.addAndGet(-((end-start)* blockSize));
+      for (int i = start; i < end; i++) {
+        blocks[i] = null;
+      }
+    }
+    
+  };
 
   String toMB(long v) {
     return nf.format(v/1024./1024.);
   }
-
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Wed Mar 30 13:30:07 2011
@@ -9,16 +9,92 @@ import org.apache.lucene.index.SegmentCo
 import org.apache.lucene.index.codecs.CodecProvider;
 
 public abstract class DocumentsWriterPerThreadPool {
-  final static class ThreadState extends ReentrantLock {
-    final DocumentsWriterPerThread perThread;
+  
+  /**
+   * {@link ThreadState} references and guards a
+   * {@link DocumentsWriterPerThread} instance that is used during indexing to
+   * build a in-memory index segment. {@link ThreadState} also holds all flush
+   * related per-thread data controlled by {@link DocumentsWriterFlushControl}.
+   * <p>
+   * A {@link ThreadState}, its methods and members should only accessed by one
+   * thread a time. Users must acquire the lock via {@link ThreadState#lock()}
+   * and release the lock in a finally block via {@link ThreadState#unlock()}
+   * before accessing the state.
+   */
+  @SuppressWarnings("serial")
+  public final static class ThreadState extends ReentrantLock {
+    // public for FlushPolicy
+    DocumentsWriterPerThread perThread;
+    // write access guarded by DocumentsWriterFlushControl
+    volatile boolean flushPending = false;
+    // write access guarded by DocumentsWriterFlushControl
+    long perThreadBytes = 0;
+    
+    // guarded by Reentrant lock
+    private boolean isActive = true;
 
     ThreadState(DocumentsWriterPerThread perThread) {
       this.perThread = perThread;
     }
+    
+    /**
+     * Resets the internal {@link DocumentsWriterPerThread} with the given one. 
+     * if the given DWPT is <code>null</code> this ThreadState is marked as inactive and should not be used
+     * for indexing anymore.
+     * @see #isActive()  
+     */
+    void resetWriter(DocumentsWriterPerThread perThread) {
+      assert this.isHeldByCurrentThread();
+      if (perThread == null) {
+        isActive = false;
+      }
+      this.perThread = perThread;
+      this.perThreadBytes = 0;
+      this.flushPending = false;
+    }
+    
+    /**
+     * Returns <code>true</code> if this ThreadState is still open. This will
+     * only return <code>false</code> iff the DW has been closed and this
+     * ThreadState is already checked out for flush.
+     */
+    boolean isActive() {
+      assert this.isHeldByCurrentThread();
+      return isActive;
+    }
+    
+    /**
+     * Returns the number of currently active bytes in this ThreadState's
+     * {@link DocumentsWriterPerThread}
+     */
+    public long getBytesUsedPerThread() {
+      assert this.isHeldByCurrentThread();
+      // public for FlushPolicy
+      return perThreadBytes;
+    }
+    
+    /**
+     * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread}
+     */
+    public DocumentsWriterPerThread getDocumentsWriterPerThread() {
+      assert this.isHeldByCurrentThread();
+      // public for FlushPolicy
+      return perThread;
+    }
+    
+    /**
+     * Returns <code>true</code> iff this {@link ThreadState} is marked as flush
+     * pending otherwise <code>false</code>
+     */
+    public boolean isFlushPending() {
+      return flushPending;
+    }
   }
 
   private final ThreadState[] perThreads;
   private volatile int numThreadStatesActive;
+  private CodecProvider codecProvider;
+  private FieldNumberBiMap globalFieldMap;
 
   public DocumentsWriterPerThreadPool(int maxNumPerThreads) {
     maxNumPerThreads = (maxNumPerThreads < 1) ? IndexWriterConfig.DEFAULT_MAX_THREAD_STATES : maxNumPerThreads;
@@ -28,7 +104,8 @@ public abstract class DocumentsWriterPer
   }
 
   public void initialize(DocumentsWriter documentsWriter, FieldNumberBiMap globalFieldMap, IndexWriterConfig config) {
-    final CodecProvider codecProvider = config.getCodecProvider();
+    codecProvider = config.getCodecProvider();
+    this.globalFieldMap = globalFieldMap;
     for (int i = 0; i < perThreads.length; i++) {
       final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
       perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, infos, documentsWriter.chain));
@@ -41,26 +118,53 @@ public abstract class DocumentsWriterPer
 
   public synchronized ThreadState newThreadState() {
     if (numThreadStatesActive < perThreads.length) {
-      ThreadState state = perThreads[numThreadStatesActive];
-      numThreadStatesActive++;
-      return state;
+      return perThreads[numThreadStatesActive++];
     }
-
     return null;
   }
-
+  
+  protected DocumentsWriterPerThread replaceForFlush(ThreadState threadState, boolean closed) {
+    assert threadState.isHeldByCurrentThread();
+    final DocumentsWriterPerThread dwpt = threadState.perThread;
+    if (!closed) {
+      final FieldInfos infos = globalFieldMap.newFieldInfos(SegmentCodecsBuilder.create(codecProvider));
+      threadState.resetWriter(new DocumentsWriterPerThread(dwpt, infos));
+    } else {
+      threadState.resetWriter(null);
+    }
+    clearThreadBindings(threadState); // TODO - do we need to clear ThreadBindings here  since we swap DWPT this is not necessary
+    return dwpt;
+  }
+  
+  public void recycle(DocumentsWriterPerThread dwpt) {
+    // don't recycle DWPT by default
+  }
+  
   public abstract ThreadState getAndLock(Thread requestingThread, DocumentsWriter documentsWriter, Document doc);
 
   public abstract void clearThreadBindings(ThreadState perThread);
 
   public abstract void clearAllThreadBindings();
 
+  /**
+   * Returns an iterator providing access to all {@link ThreadState}
+   * instances. 
+   */
   public Iterator<ThreadState> getAllPerThreadsIterator() {
     return getPerThreadsIterator(this.perThreads.length);
   }
 
+  /**
+   * Returns an iterator providing access to all active {@link ThreadState}
+   * instances. 
+   * <p>
+   * Note: The returned iterator will only iterator
+   * {@link ThreadState}s that are active at the point in time when this method
+   * has been called.
+   * 
+   */
   public Iterator<ThreadState> getActivePerThreadsIterator() {
-    return getPerThreadsIterator(this.numThreadStatesActive);
+    return getPerThreadsIterator(numThreadStatesActive);
   }
 
   private Iterator<ThreadState> getPerThreadsIterator(final int upto) {
@@ -80,4 +184,21 @@ public abstract class DocumentsWriterPer
       }
     };
   }
+
+  /**
+   * Returns the ThreadState with the minimum estimated number of threads
+   * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
+   * is yet visible to the calling thread.
+   */
+  protected ThreadState minContendedThreadState() {
+    ThreadState minThreadState = null;
+    final Iterator<ThreadState> it = getActivePerThreadsIterator();
+    while (it.hasNext()) {
+      final ThreadState state = it.next();
+      if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
+        minThreadState = state;
+      }
+    }
+    return minThreadState;
+  }
 }

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,66 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * Default {@link FlushPolicy} implementation that flushes based on RAM
+ * Consumption, document count and number of buffered deletes depending on the
+ * IndexWriters {@link IndexWriterConfig}. This {@link FlushPolicy} will only
+ * respect settings which are not disabled during initialization (
+ * {@link #init(DocumentsWriter)}). All enabled {@link IndexWriterConfig}
+ * settings are used to mark {@link DocumentsWriterPerThread} as flush pending
+ * during indexing with respect to thier live updates.
+ * <p>
+ * If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled always the
+ * largest ram consuming {@link DocumentsWriterPerThread} will be marked as
+ * pending iff the global active RAM consumption is equals or higher the
+ * configured max RAM buffer.
+ */
+public class FlushByRamOrCountsPolicy extends FlushPolicy {
+
+  @Override
+  public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
+    if (flushOnDeleteTerms()) {
+      final int maxBufferedDeleteTerms = indexWriterConfig
+          .getMaxBufferedDeleteTerms();
+      if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) {
+        control.setFlushDeletes();
+      }
+    }
+  }
+
+  @Override
+  public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
+    if (flushOnDocCount()
+        && state.perThread.getNumDocsInRAM() >= indexWriterConfig
+            .getMaxBufferedDocs()) {
+      control.setFlushPending(state); // flush by num docs
+    } else {// flush by RAM
+      if (flushOnRAM()) {
+        final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+        final long totalRam = control.activeBytes();
+        final long limit = (long) (ramBufferSizeMB * 1024.d * 1024.d);
+        if (totalRam >= limit) {
+          markLargestWriterPending(control, state, totalRam);
+        }
+      }
+    }
+  }
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FlushPolicy.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,191 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Iterator;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.SetOnce;
+
+/**
+ * {@link FlushPolicy} controls when segments are flushed from a RAM resident
+ * internal data-structure to the {@link IndexWriter}s {@link Directory}.
+ * <p>
+ * Segments are traditionally flushed by:
+ * <ul>
+ * <li>RAM consumption - configured via
+ * {@link IndexWriterConfig#setRAMBufferSizeMB(double)}</li>
+ * <li>Number of RAM resident documents - configured via
+ * {@link IndexWriterConfig#setMaxBufferedDocs(int)}</li>
+ * <li>Number of buffered delete terms - configured via
+ * {@link IndexWriterConfig#setMaxBufferedDeleteTerms(int)}</li>
+ * </ul>
+ * 
+ * The {@link IndexWriter} uses a provided {@link FlushPolicy} to control the
+ * flushing process during indexing. The policy is informed for each added or
+ * updated document as well as for each delete term. Based on the
+ * {@link FlushPolicy} the information provided via {@link ThreadState} and
+ * {@link DocumentsWriterFlushControl} the {@link FlushPolicy} can decide if a
+ * {@link DocumentsWriterPerThread} needs flushing and can mark it as
+ * flush-pending via
+ * {@link DocumentsWriterFlushControl#setFlushPending(ThreadState)}.
+ * 
+ * @see ThreadState
+ * @see DocumentsWriterFlushControl
+ * @see DocumentsWriterPerThread
+ * @see IndexWriterConfig#setFlushPolicy(FlushPolicy)
+ */
+public abstract class FlushPolicy {
+  protected final SetOnce<DocumentsWriter> writer = new SetOnce<DocumentsWriter>();
+  protected IndexWriterConfig indexWriterConfig;
+
+  /**
+   * Called for each delete term applied to the given {@link ThreadState}s
+   * {@link DocumentsWriterPerThread}.
+   * <p>
+   * Note: This method is synchronized by the given
+   * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+   * thread holds the lock on the given {@link ThreadState}
+   */
+  public abstract void onDelete(DocumentsWriterFlushControl control,
+      ThreadState state);
+
+  /**
+   * Called for each document update on the given {@link ThreadState}s
+   * {@link DocumentsWriterPerThread}.
+   * <p>
+   * Note: This method is synchronized by the given
+   * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+   * thread holds the lock on the given {@link ThreadState}
+   */
+  public void onUpdate(DocumentsWriterFlushControl control, ThreadState state) {
+    onInsert(control, state);
+    if (!state.flushPending) {
+      onDelete(control, state);
+    }
+  }
+
+  /**
+   * Called for each document addition on the given {@link ThreadState}s
+   * {@link DocumentsWriterPerThread}.
+   * <p>
+   * Note: This method is synchronized by the given
+   * {@link DocumentsWriterFlushControl} and it is guaranteed that the calling
+   * thread holds the lock on the given {@link ThreadState}
+   */
+  public abstract void onInsert(DocumentsWriterFlushControl control,
+      ThreadState state);
+
+  /**
+   * Called by {@link DocumentsWriter} to initialize the FlushPolicy
+   */
+  protected synchronized void init(DocumentsWriter docsWriter) {
+    writer.set(docsWriter);
+    indexWriterConfig = docsWriter.indexWriter.getConfig();
+  }
+
+  /**
+   * Marks the most ram consuming active {@link DocumentsWriterPerThread} flush
+   * pending
+   */
+  protected void markLargestWriterPending(DocumentsWriterFlushControl control,
+      ThreadState perThreadState, final long currentBytesPerThread) {
+    control
+        .setFlushPending(findLargestNonPendingWriter(control, perThreadState));
+  }
+
+  /**
+   * Returns the current most RAM consuming non-pending {@link ThreadState} with
+   * at least one indexed document.
+   * <p>
+   * This method will never return <code>null</code>
+   */
+  protected ThreadState findLargestNonPendingWriter(
+      DocumentsWriterFlushControl control, ThreadState perThreadState) {
+    long maxRamSoFar = perThreadState.perThreadBytes;
+    // the dwpt which needs to be flushed eventually
+    ThreadState maxRamUsingThreadState = perThreadState;
+    assert !perThreadState.flushPending : "DWPT should have flushed";
+    Iterator<ThreadState> activePerThreadsIterator = control.allActiveThreads();
+    while (activePerThreadsIterator.hasNext()) {
+      ThreadState next = activePerThreadsIterator.next();
+      if (!next.flushPending) {
+        final long nextRam = next.perThreadBytes;
+        if (nextRam > maxRamSoFar && next.perThread.getNumDocsInRAM() > 0) {
+          maxRamSoFar = nextRam;
+          maxRamUsingThreadState = next;
+        }
+      }
+    }
+    assert maxRamUsingThreadState.perThread.getNumDocsInRAM() > 0;
+    assert writer.get().message(
+        "set largest ram consuming thread pending on lower watermark");
+    return maxRamUsingThreadState;
+  }
+
+  /**
+   * Returns the max net memory which marks the upper watermark for the
+   * DocumentsWriter to be healthy. If all flushing and active
+   * {@link DocumentsWriterPerThread} consume more memory than the upper
+   * watermark all incoming threads should be stalled and blocked until the
+   * memory drops below this.
+   * <p>
+   * Note: the upper watermark is only taken into account if this
+   * {@link FlushPolicy} flushes by ram usage.
+   * 
+   * <p>
+   * The default for the max net memory is set to 2 x
+   * {@link IndexWriterConfig#getRAMBufferSizeMB()}
+   * 
+   */
+  public long getMaxNetBytes() {
+    if (!flushOnRAM()) {
+      return -1;
+    }
+    final double ramBufferSizeMB = indexWriterConfig.getRAMBufferSizeMB();
+    return (long) (ramBufferSizeMB * 1024.d * 1024.d * 2);
+  }
+
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getMaxBufferedDocs()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnDocCount() {
+    return indexWriterConfig.getMaxBufferedDocs() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getMaxBufferedDeleteTerms()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnDeleteTerms() {
+    return indexWriterConfig.getMaxBufferedDeleteTerms() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+  /**
+   * Returns <code>true</code> if this {@link FlushPolicy} flushes on
+   * {@link IndexWriterConfig#getRAMBufferSizeMB()}, otherwise
+   * <code>false</code>.
+   */
+  protected boolean flushOnRAM() {
+    return indexWriterConfig.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH;
+  }
+
+}

Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java?rev=1086947&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/Healthiness.java Wed Mar 30 13:30:07 2011
@@ -0,0 +1,120 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
+
+/**
+ * Controls the health status of a {@link DocumentsWriter} sessions. This class
+ * used to block incoming indexing threads if flushing significantly slower than
+ * indexing to ensure the {@link DocumentsWriter}s healthiness. If flushing is
+ * significantly slower than indexing the net memory used within an
+ * {@link IndexWriter} session can increase very quickly and easily exceed the
+ * JVM's available memory.
+ * <p>
+ * To prevent OOM Errors and ensure IndexWriter's stability this class blocks
+ * incoming threads from indexing once 2 x number of available
+ * {@link ThreadState}s in {@link DocumentsWriterPerThreadPool} is exceeded.
+ * Once flushing catches up and the number of flushing DWPT is equal or lower
+ * than the number of active {@link ThreadState}s threads are released and can
+ * continue indexing.
+ */
+final class Healthiness {
+
+  @SuppressWarnings("serial")
+  private static final class Sync extends AbstractQueuedSynchronizer {
+    volatile boolean hasBlockedThreads = false; // only with assert
+
+    Sync() {
+      setState(0);
+    }
+
+    boolean isHealthy() {
+      return getState() == 0;
+    }
+
+    boolean trySetStalled() {
+      int state = getState();
+      return compareAndSetState(state, state + 1);
+    }
+
+    boolean tryReset() {
+      final int oldState = getState();
+      if (oldState == 0)
+        return true;
+      if (compareAndSetState(oldState, 0)) {
+        releaseShared(0);
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public int tryAcquireShared(int acquires) {
+      assert maybeSetHasBlocked(getState());
+      return getState() == 0 ? 1 : -1;
+    }
+
+    // only used for testing
+    private boolean maybeSetHasBlocked(int state) {
+      hasBlockedThreads |= getState() != 0;
+      return true;
+    }
+
+    @Override
+    public boolean tryReleaseShared(int newState) {
+      return (getState() == 0);
+    }
+  }
+
+  private final Healthiness.Sync sync = new Sync();
+  volatile boolean wasStalled = false; // only with asserts
+
+  boolean isStalled() {
+    return !sync.isHealthy();
+  }
+
+  /**
+   * Update the stalled flag status. This method will set the stalled flag to
+   * <code>true</code> iff the number of flushing
+   * {@link DocumentsWriterPerThread} is greater than the number of active
+   * {@link DocumentsWriterPerThread}. Otherwise it will reset the
+   * {@link Healthiness} to healthy and release all threads waiting on
+   * {@link #waitIfStalled()}
+   */
+  void updateStalled(DocumentsWriterFlushControl flushControl) {
+    do {
+      // if we have more flushing DWPT than numActiveDWPT we stall!
+      while (flushControl.numActiveDWPT() < flushControl.numFlushingDWPT()) {
+        if (sync.trySetStalled()) {
+          assert wasStalled = true;
+          return;
+        }
+      }
+    } while (!sync.tryReset());
+  }
+
+  void waitIfStalled() {
+    sync.acquireShared(0);
+  }
+
+  boolean hasBlocked() {
+    return sync.hasBlockedThreads;
+  }
+}
\ No newline at end of file

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Wed Mar 30 13:30:07 2011
@@ -248,8 +248,8 @@ public class IndexWriter implements Clos
   private long mergeGen;
   private boolean stopMerges;
 
-  private final AtomicInteger flushCount = new AtomicInteger();
-  private final AtomicInteger flushDeletesCount = new AtomicInteger();
+  final AtomicInteger flushCount = new AtomicInteger();
+  final AtomicInteger flushDeletesCount = new AtomicInteger();
 
   final ReaderPool readerPool = new ReaderPool();
   final BufferedDeletesStream bufferedDeletesStream;
@@ -2540,17 +2540,7 @@ public class IndexWriter implements Clos
     doBeforeFlush();
 
     assert testPoint("startDoFlush");
-
-    // We may be flushing because it was triggered by doc
-    // count, del count, ram usage (in which case flush
-    // pending is already set), or we may be flushing
-    // due to external event eg getReader or commit is
-    // called (in which case we now set it, and this will
-    // pause all threads):
-    flushControl.setFlushPendingNoWait("explicit flush");
-
     boolean success = false;
-
     try {
 
       if (infoStream != null) {
@@ -2566,8 +2556,7 @@ public class IndexWriter implements Clos
           // buffer, force them all to apply now. This is to
           // prevent too-frequent flushing of a long tail of
           // tiny segments:
-          if (flushControl.getFlushDeletes() ||
-              (config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
+          if ((config.getRAMBufferSizeMB() != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
                bufferedDeletesStream.bytesUsed() > (1024*1024*config.getRAMBufferSizeMB()/2))) {
             applyAllDeletes = true;
             if (infoStream != null) {
@@ -2580,39 +2569,16 @@ public class IndexWriter implements Clos
           if (infoStream != null) {
             message("apply all deletes during flush");
           }
-          flushDeletesCount.incrementAndGet();
-          final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
-          if (result.anyDeletes) {
-            checkpoint();
-          }
-          if (!keepFullyDeletedSegments && result.allDeleted != null) {
-            if (infoStream != null) {
-              message("drop 100% deleted segments: " + result.allDeleted);
-            }
-            for(SegmentInfo info : result.allDeleted) {
-              // If a merge has already registered for this
-              // segment, we leave it in the readerPool; the
-              // merge will skip merging it and will then drop
-              // it once it's done:
-              if (!mergingSegments.contains(info)) {
-                segmentInfos.remove(info);
-                if (readerPool != null) {
-                  readerPool.drop(info);
-                }
-              }
-            }
-            checkpoint();
-          }
-          bufferedDeletesStream.prune(segmentInfos);
-          assert !bufferedDeletesStream.any();
-
-          flushControl.clearDeletes();
+          applyAllDeletes();
         } else if (infoStream != null) {
           message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
         }
 
         doAfterFlush();
-        flushCount.incrementAndGet();
+        if (!maybeMerge) {
+          // flushCount is incremented in flushAllThreads
+          flushCount.incrementAndGet();
+        }
 
         success = true;
 
@@ -2624,20 +2590,51 @@ public class IndexWriter implements Clos
       // never hit
       return false;
     } finally {
-      flushControl.clearFlushPending();
       if (!success && infoStream != null)
         message("hit exception during flush");
     }
   }
+  
+  final synchronized void applyAllDeletes() throws IOException {
+    flushDeletesCount.incrementAndGet();
+    final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos);
+    if (result.anyDeletes) {
+      checkpoint();
+    }
+    if (!keepFullyDeletedSegments && result.allDeleted != null) {
+      if (infoStream != null) {
+        message("drop 100% deleted segments: " + result.allDeleted);
+      }
+      for(SegmentInfo info : result.allDeleted) {
+        // If a merge has already registered for this
+        // segment, we leave it in the readerPool; the
+        // merge will skip merging it and will then drop
+        // it once it's done:
+        if (!mergingSegments.contains(info)) {
+          segmentInfos.remove(info);
+          if (readerPool != null) {
+            readerPool.drop(info);
+          }
+        }
+      }
+      checkpoint();
+    }
+    bufferedDeletesStream.prune(segmentInfos);
+  }
 
   /** Expert:  Return the total size of all index files currently cached in memory.
    * Useful for size management with flushRamDocs()
    */
   public final long ramSizeInBytes() {
     ensureOpen();
-    // nocommit
-    //return docWriter.bytesUsed() + bufferedDeletesStream.bytesUsed();
-    return 0;
+    return docWriter.flushControl.netBytes() + bufferedDeletesStream.bytesUsed();
+  }
+  
+  // for testing only
+  DocumentsWriter getDocsWriter() {
+    boolean test = false;
+    assert test = true;
+    return test?docWriter: null;
   }
 
   /** Expert:  Return the number of documents currently
@@ -3681,124 +3678,4 @@ public class IndexWriter implements Clos
   public PayloadProcessorProvider getPayloadProcessorProvider() {
     return payloadProcessorProvider;
   }
-
-  // decides when flushes happen
-  final class FlushControl {
-
-    private boolean flushPending;
-    private boolean flushDeletes;
-    private int delCount;
-    private int docCount;
-    private boolean flushing;
-
-    private synchronized boolean setFlushPending(String reason, boolean doWait) {
-      if (flushPending || flushing) {
-        if (doWait) {
-          while(flushPending || flushing) {
-            try {
-              wait();
-            } catch (InterruptedException ie) {
-              throw new ThreadInterruptedException(ie);
-            }
-          }
-        }
-        return false;
-      } else {
-        if (infoStream != null) {
-          message("now trigger flush reason=" + reason);
-        }
-        flushPending = true;
-        return flushPending;
-      }
-    }
-
-    public synchronized void setFlushPendingNoWait(String reason) {
-      setFlushPending(reason, false);
-    }
-
-    public synchronized boolean getFlushPending() {
-      return flushPending;
-    }
-
-    public synchronized boolean getFlushDeletes() {
-      return flushDeletes;
-    }
-
-    public synchronized void clearFlushPending() {
-      if (infoStream != null) {
-        message("clearFlushPending");
-      }
-      flushPending = false;
-      flushDeletes = false;
-      docCount = 0;
-      notifyAll();
-    }
-
-    public synchronized void clearDeletes() {
-      delCount = 0;
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc) {
-      return waitUpdate(docInc, delInc, false);
-    }
-
-    public synchronized boolean waitUpdate(int docInc, int delInc, boolean skipWait) {
-      while(flushPending) {
-        try {
-          wait();
-        } catch (InterruptedException ie) {
-          throw new ThreadInterruptedException(ie);
-        }
-      }
-
-      // skipWait is only used when a thread is BOTH adding
-      // a doc and buffering a del term, and, the adding of
-      // the doc already triggered a flush
-      if (skipWait) {
-        docCount += docInc;
-        delCount += delInc;
-        return false;
-      }
-
-      final int maxBufferedDocs = config.getMaxBufferedDocs();
-      if (maxBufferedDocs != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (docCount+docInc) >= maxBufferedDocs) {
-        return setFlushPending("maxBufferedDocs", true);
-      }
-      docCount += docInc;
-
-      final int maxBufferedDeleteTerms = config.getMaxBufferedDeleteTerms();
-      if (maxBufferedDeleteTerms != IndexWriterConfig.DISABLE_AUTO_FLUSH &&
-          (delCount+delInc) >= maxBufferedDeleteTerms) {
-        flushDeletes = true;
-        return setFlushPending("maxBufferedDeleteTerms", true);
-      }
-      delCount += delInc;
-
-      return flushByRAMUsage("add delete/doc");
-    }
-
-    public synchronized boolean flushByRAMUsage(String reason) {
-//      final double ramBufferSizeMB = config.getRAMBufferSizeMB();
-//      if (ramBufferSizeMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
-//        final long limit = (long) (ramBufferSizeMB*1024*1024);
-//        long used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-//        if (used >= limit) {
-//
-//          // DocumentsWriter may be able to free up some
-//          // RAM:
-//          // Lock order: FC -> DW
-//          docWriter.balanceRAM();
-//
-//          used = bufferedDeletesStream.bytesUsed() + docWriter.bytesUsed();
-//          if (used >= limit) {
-//            return setFlushPending("ram full: " + reason, false);
-//          }
-//        }
-//      }
-      return false;
-    }
-  }
-
-  final FlushControl flushControl = new FlushControl();
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Wed Mar 30 13:30:07 2011
@@ -94,6 +94,8 @@ public final class IndexWriterConfig imp
   /** Default value is 1. Change using {@link #setReaderTermsIndexDivisor(int)}. */
   public static final int DEFAULT_READER_TERMS_INDEX_DIVISOR = IndexReader.DEFAULT_TERMS_INDEX_DIVISOR;
 
+  /** Default value is 1945. Change using {@link #setRAMPerThreadHardLimitMB(int)} */
+  public static final int DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB = 1945;
   /**
    * Sets the default (for any instance) maximum time to wait for a write lock
    * (in milliseconds).
@@ -130,6 +132,8 @@ public final class IndexWriterConfig imp
   private volatile DocumentsWriterPerThreadPool indexerThreadPool;
   private volatile boolean readerPooling;
   private volatile int readerTermsIndexDivisor;
+  private volatile FlushPolicy flushPolicy;
+  private volatile int perThreadHardLimitMB;
 
   private Version matchVersion;
 
@@ -160,6 +164,7 @@ public final class IndexWriterConfig imp
     readerPooling = DEFAULT_READER_POOLING;
     indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool(DEFAULT_MAX_THREAD_STATES);
     readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
+    perThreadHardLimitMB = DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
   }
 
   @Override
@@ -352,6 +357,7 @@ public final class IndexWriterConfig imp
    * @throws IllegalArgumentException if maxBufferedDeleteTerms
    * is enabled but smaller than 1
    * @see #setRAMBufferSizeMB
+   * @see #setFlushPolicy(FlushPolicy)
    *
    * <p>Takes effect immediately, but only the next time a
    * document is added, updated or deleted.
@@ -380,14 +386,20 @@ public final class IndexWriterConfig imp
    * and deletions before they are flushed to the Directory. Generally for
    * faster indexing performance it's best to flush by RAM usage instead of
    * document count and use as large a RAM buffer as you can.
-   *
    * <p>
    * When this is set, the writer will flush whenever buffered documents and
    * deletions use this much RAM. Pass in {@link #DISABLE_AUTO_FLUSH} to prevent
    * triggering a flush due to RAM usage. Note that if flushing by document
    * count is also enabled, then the flush will be triggered by whichever comes
    * first.
-   *
+   * <p>
+   * The maximum RAM limit is inherently determined by the JVMs available memory.
+   * Yet, an {@link IndexWriter} session can consume a significantly larger amount
+   * of memory than the given RAM limit since this limit is just an indicator when
+   * to flush memory resident documents to the Directory. Flushes are likely happen
+   * concurrently while other threads adding documents to the writer. For application
+   * stability the available memory in the JVM should be significantly larger than
+   * the RAM buffer used for indexing.
    * <p>
    * <b>NOTE</b>: the account of RAM usage for pending deletions is only
    * approximate. Specifically, if you delete by Query, Lucene currently has no
@@ -396,16 +408,15 @@ public final class IndexWriterConfig imp
    * periodically yourself, or by using {@link #setMaxBufferedDeleteTerms(int)}
    * to flush by count instead of RAM usage (each buffered delete Query counts
    * as one).
-   *
    * <p>
-   * <b>NOTE</b>: because IndexWriter uses <code>int</code>s when managing its
-   * internal storage, the absolute maximum value for this setting is somewhat
-   * less than 2048 MB. The precise limit depends on various factors, such as
-   * how large your documents are, how many fields have norms, etc., so it's
-   * best to set this value comfortably under 2048.
-   *
+   * <b>NOTE</b>: It's not guaranteed that all memory resident documents are flushed 
+   * once this limit is exceeded. Depending on the configured {@link FlushPolicy} only a
+   * subset of the buffered documents are flushed and therefore only parts of the RAM
+   * buffer is released.    
    * <p>
+   * 
    * The default value is {@link #DEFAULT_RAM_BUFFER_SIZE_MB}.
+   * @see #setFlushPolicy(FlushPolicy)
    *
    * <p>Takes effect immediately, but only the next time a
    * document is added, updated or deleted.
@@ -413,12 +424,9 @@ public final class IndexWriterConfig imp
    * @throws IllegalArgumentException
    *           if ramBufferSize is enabled but non-positive, or it disables
    *           ramBufferSize when maxBufferedDocs is already disabled
+   *           
    */
   public IndexWriterConfig setRAMBufferSizeMB(double ramBufferSizeMB) {
-    if (ramBufferSizeMB > 2048.0) {
-      throw new IllegalArgumentException("ramBufferSize " + ramBufferSizeMB
-          + " is too large; should be comfortably less than 2048");
-    }
     if (ramBufferSizeMB != DISABLE_AUTO_FLUSH && ramBufferSizeMB <= 0.0)
       throw new IllegalArgumentException(
           "ramBufferSize should be > 0.0 MB when enabled");
@@ -453,7 +461,7 @@ public final class IndexWriterConfig imp
    * document is added, updated or deleted.
    *
    * @see #setRAMBufferSizeMB(double)
-   *
+   * @see #setFlushPolicy(FlushPolicy)
    * @throws IllegalArgumentException
    *           if maxBufferedDocs is enabled but smaller than 2, or it disables
    *           maxBufferedDocs when ramBufferSize is already disabled
@@ -607,6 +615,53 @@ public final class IndexWriterConfig imp
   public int getReaderTermsIndexDivisor() {
     return readerTermsIndexDivisor;
   }
+  
+  /**
+   * Expert: Controls when segments are flushed to disk during indexing.
+   * The {@link FlushPolicy} initialized during {@link IndexWriter} instantiation and once initialized
+   * the given instance is bound to this {@link IndexWriter} and should not be used with another writer.
+   * @see #setMaxBufferedDeleteTerms(int)
+   * @see #setMaxBufferedDocs(int)
+   * @see #setRAMBufferSizeMB(double)
+   */
+  public IndexWriterConfig setFlushPolicy(FlushPolicy flushPolicy) {
+    this.flushPolicy = flushPolicy;
+    return this;
+  }
+
+  /**
+   * Expert: Sets the maximum memory consumption per thread triggering a forced
+   * flush if exceeded. A {@link DocumentsWriterPerThread} is forcefully flushed
+   * once it exceeds this limit even if the {@link #getRAMBufferSizeMB()} has
+   * not been exceeded. This is a safety limit to prevent a
+   * {@link DocumentsWriterPerThread} from address space exhaustion due to its
+   * internal 32 bit signed integer based memory addressing.
+   * The given value must be less that 2GB (2048MB)
+   * 
+   * @see #DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB
+   */
+  public IndexWriterConfig setRAMPerThreadHardLimitMB(int perThreadHardLimitMB) {
+    if (perThreadHardLimitMB <= 0 || perThreadHardLimitMB >= 2048) {
+      throw new IllegalArgumentException("PerThreadHardLimit must be greater than 0 and less than 2048MB");
+    }
+    this.perThreadHardLimitMB = perThreadHardLimitMB;
+    return this;
+  }
+
+  /**
+   * Returns the max amount of memory each {@link DocumentsWriterPerThread} can
+   * consume until forcefully flushed.
+   * @see #setRAMPerThreadHardLimitMB(int) 
+   */
+  public int getRAMPerThreadHardLimitMB() {
+    return perThreadHardLimitMB;
+  }
+  /**
+   * @see #setFlushPolicy(FlushPolicy)
+   */
+  public FlushPolicy getFlushPolicy() {
+    return flushPolicy;
+  }
 
   @Override
   public String toString() {
@@ -631,6 +686,10 @@ public final class IndexWriterConfig imp
     sb.append("maxThreadStates=").append(indexerThreadPool.getMaxThreadStates()).append("\n");
     sb.append("readerPooling=").append(readerPooling).append("\n");
     sb.append("readerTermsIndexDivisor=").append(readerTermsIndexDivisor).append("\n");
+    sb.append("flushPolicy=").append(flushPolicy).append("\n");
+    sb.append("perThreadHardLimitMB=").append(perThreadHardLimitMB).append("\n");
+
     return sb.toString();
   }
+
 }

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IntBlockPool.java Wed Mar 30 13:30:07 2011
@@ -1,5 +1,7 @@
 package org.apache.lucene.index;
 
+import java.util.Arrays;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -36,6 +38,10 @@ final class IntBlockPool {
   public void reset() {
     if (bufferUpto != -1) {
       // Reuse first buffer
+      if (bufferUpto > 0) {
+        docWriter.recycleIntBlocks(buffers, 1, bufferUpto-1);
+        Arrays.fill(buffers, 1, bufferUpto, null);
+      }
       bufferUpto = 0;
       intUpto = 0;
       intOffset = 0;

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java Wed Mar 30 13:30:07 2011
@@ -52,13 +52,14 @@ final class TermsHash extends InvertedDo
   // Used by perField to obtain terms from the analysis chain
   final BytesRef termBytesRef = new BytesRef(10);
 
-  boolean trackAllocations;
+  final boolean trackAllocations;
 
 
-  public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, final TermsHash nextTermsHash) {
+  public TermsHash(final DocumentsWriterPerThread docWriter, final TermsHashConsumer consumer, boolean trackAllocations, final TermsHash nextTermsHash) {
     this.docState = docWriter.docState;
     this.docWriter = docWriter;
     this.consumer = consumer;
+    this.trackAllocations = trackAllocations; 
     this.nextTermsHash = nextTermsHash;
     intPool = new IntBlockPool(docWriter);
     bytePool = new ByteBlockPool(docWriter.byteBlockAllocator);

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHashPerField.java Wed Mar 30 13:30:07 2011
@@ -63,8 +63,8 @@ final class TermsHashPerField extends In
     termBytePool = termsHash.termBytePool;
     docState = termsHash.docState;
     this.termsHash = termsHash;
-    bytesUsed =  termsHash.trackAllocations?termsHash.docWriter.bytesUsed:new AtomicLong();
-
+    bytesUsed = termsHash.trackAllocations ? termsHash.docWriter.bytesUsed
+        : new AtomicLong();
     fieldState = docInverterPerField.fieldState;
     this.consumer = termsHash.consumer.addField(this, fieldInfo);
     PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
@@ -311,7 +311,7 @@ final class TermsHashPerField extends In
     @Override
     public int[] clear() {
       if(perField.postingsArray != null) {
-        bytesUsed.addAndGet(-perField.postingsArray.size * perField.postingsArray.bytesPerPosting());
+        bytesUsed.addAndGet(-(perField.postingsArray.size * perField.postingsArray.bytesPerPosting()));
         perField.postingsArray = null;
       }
       return null;

Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java?rev=1086947&r1=1086946&r2=1086947&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ThreadAffinityDocumentsWriterThreadPool.java Wed Mar 30 13:30:07 2011
@@ -1,6 +1,20 @@
 package org.apache.lucene.index;
-
-import java.util.Iterator;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -11,6 +25,7 @@ public class ThreadAffinityDocumentsWrit
 
   public ThreadAffinityDocumentsWriterThreadPool(int maxNumPerThreads) {
     super(maxNumPerThreads);
+    assert getMaxThreadStates() >= 1;
   }
 
   @Override
@@ -21,25 +36,25 @@ public class ThreadAffinityDocumentsWrit
         return threadState;
       }
     }
-
-    // find the state that has minimum amount of threads waiting
-    Iterator<ThreadState> it = getActivePerThreadsIterator();
     ThreadState minThreadState = null;
-    while (it.hasNext()) {
-      ThreadState state = it.next();
-      if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
-        minThreadState = state;
-      }
-    }
-
+    // find the state that has minimum amount of threads waiting
+    minThreadState = minContendedThreadState();
     if (minThreadState == null || minThreadState.hasQueuedThreads()) {
       ThreadState newState = newThreadState();
       if (newState != null) {
         minThreadState = newState;
         threadBindings.put(requestingThread, newState);
+      } else if (minThreadState == null) {
+        /*
+         * no new threadState available we just take the minContented one
+         * This must return a valid thread state since we accessed the 
+         * synced context in newThreadState() above.
+         */
+        minThreadState = minContendedThreadState();
       }
     }
-
+    assert minThreadState != null: "ThreadState is null";
+    
     minThreadState.lock();
     return minThreadState;
   }
@@ -53,4 +68,5 @@ public class ThreadAffinityDocumentsWrit
   public void clearAllThreadBindings() {
     threadBindings.clear();
   }
+  
 }