You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/05/17 08:58:39 UTC

svn commit: r1104026 - in /lucene/dev/trunk/lucene/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Author: simonw
Date: Tue May 17 06:58:39 2011
New Revision: 1104026

URL: http://svn.apache.org/viewvc?rev=1104026&view=rev
Log:
LUCENE-3090: DWFlushControl does not take active DWPT out of the loop on fullFlush

Added:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
      - copied, changed from r1102572, lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
Removed:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 17 06:58:39 2011
@@ -126,7 +126,6 @@ final class DocumentsWriter {
   final DocumentsWriterPerThreadPool perThreadPool;
   final FlushPolicy flushPolicy;
   final DocumentsWriterFlushControl flushControl;
-  final Healthiness healthiness;
   DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
       BufferedDeletesStream bufferedDeletesStream) throws IOException {
     this.directory = directory;
@@ -142,10 +141,7 @@ final class DocumentsWriter {
       flushPolicy = configuredPolicy;
     }
     flushPolicy.init(this);
-    
-    healthiness = new Healthiness();
-    final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
-    flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
+    flushControl = new DocumentsWriterFlushControl(this, config );
   }
 
   synchronized void deleteQueries(final Query... queries) throws IOException {
@@ -283,31 +279,28 @@ final class DocumentsWriter {
     ensureOpen();
     boolean maybeMerge = false;
     final boolean isUpdate = delTerm != null;
-    if (healthiness.anyStalledThreads()) {
-
-      // Help out flushing any pending DWPTs so we can un-stall:
+    if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
+      // Help out flushing any queued DWPTs so we can un-stall:
       if (infoStream != null) {
-        message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
+        message("DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
       }
-
-      // Try pick up pending threads here if possible
-      DocumentsWriterPerThread flushingDWPT;
-      while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
-        // Don't push the delete here since the update could fail!
-        maybeMerge = doFlush(flushingDWPT);
-        if (!healthiness.anyStalledThreads()) {
-          break;
+      do {
+        // Try pick up pending threads here if possible
+        DocumentsWriterPerThread flushingDWPT;
+        while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+          // Don't push the delete here since the update could fail!
+          maybeMerge |= doFlush(flushingDWPT);
         }
-      }
-
-      if (infoStream != null && healthiness.anyStalledThreads()) {
-        message("WARNING DocumentsWriter still has stalled threads; waiting");
-      }
-
-      healthiness.waitIfStalled(); // block if stalled
+  
+        if (infoStream != null && flushControl.anyStalledThreads()) {
+          message("WARNING DocumentsWriter has stalled threads; waiting");
+        }
+        
+        flushControl.waitIfStalled(); // block if stalled
+      } while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
 
-      if (infoStream != null && healthiness.anyStalledThreads()) {
-        message("WARNING DocumentsWriter done waiting");
+      if (infoStream != null) {
+        message("continue indexing after helpling out flushing DocumentsWriter is healthy");
       }
     }
 
@@ -353,7 +346,6 @@ final class DocumentsWriter {
       maybeMerge = true;
       boolean success = false;
       FlushTicket ticket = null;
-      
       try {
         assert currentFullFlushDelQueue == null
             || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@@ -511,9 +503,7 @@ final class DocumentsWriter {
         anythingFlushed |= doFlush(flushingDWPT);
       }
       // If a concurrent flush is still in flight wait for it
-      while (flushControl.anyFlushing()) {
-        flushControl.waitForFlush();  
-      }
+      flushControl.waitForFlush();  
       if (!anythingFlushed) { // apply deletes if we did not flush any document
         synchronized (ticketQueue) {
           ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 17 06:58:39 2011
@@ -44,30 +44,32 @@ public final class DocumentsWriterFlushC
   private long activeBytes = 0;
   private long flushBytes = 0;
   private volatile int numPending = 0;
-  private volatile int numFlushing = 0;
   final AtomicBoolean flushDeletes = new AtomicBoolean(false);
   private boolean fullFlush = false;
-  private Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
+  private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
   // only for safety reasons if a DWPT is close to the RAM limit
-  private Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<DocumentsWriterPerThread>();
-
+  private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
 
+  double maxConfiguredRamBuffer = 0;
   long peakActiveBytes = 0;// only with assert
   long peakFlushBytes = 0;// only with assert
   long peakNetBytes = 0;// only with assert
-  private final Healthiness healthiness;
+  long peakDelta = 0; // only with assert
+  final DocumentsWriterStallControl stallControl;
   private final DocumentsWriterPerThreadPool perThreadPool;
   private final FlushPolicy flushPolicy;
   private boolean closed = false;
   private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
   private final DocumentsWriter documentsWriter;
+  private final IndexWriterConfig config;
 
   DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
-      Healthiness healthiness, long hardMaxBytesPerDWPT) {
-    this.healthiness = healthiness;
+      IndexWriterConfig config) {
+    this.stallControl = new DocumentsWriterStallControl();
     this.perThreadPool = documentsWriter.perThreadPool;
     this.flushPolicy = documentsWriter.flushPolicy;
-    this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT;
+    this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;;
+    this.config = config;
     this.documentsWriter = documentsWriter;
   }
 
@@ -82,6 +84,24 @@ public final class DocumentsWriterFlushC
   public synchronized long netBytes() {
     return flushBytes + activeBytes;
   }
+  
+  long stallLimitBytes() {
+    final double maxRamMB = config.getRAMBufferSizeMB();
+    return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
+  }
+  
+  private boolean assertMemory() {
+    final double maxRamMB = config.getRAMBufferSizeMB();
+    if (maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+      // for this assert we must be tolerant to ram buffer changes!
+      maxConfiguredRamBuffer = Math.max(maxRamMB, maxConfiguredRamBuffer);
+      final long ram = flushBytes + activeBytes;
+      // take peakDelta into account - worst case is that all flushing, pending and blocked DWPT had maxMem and the last doc had the peakDelta 
+      final long expected = (long)(2 * (maxConfiguredRamBuffer * 1024 * 1024)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta);
+      assert ram <= expected  : "ram was " + ram + " expected: " + expected + " flush mem: " + flushBytes + " active: " + activeBytes ;   
+    }
+    return true;
+  }
 
   private void commitPerThreadBytes(ThreadState perThread) {
     final long delta = perThread.perThread.bytesUsed()
@@ -105,53 +125,62 @@ public final class DocumentsWriterFlushC
     peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
     peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
     peakNetBytes = Math.max(peakNetBytes, netBytes());
+    peakDelta = Math.max(peakDelta, delta);
+    
     return true;
   }
 
   synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
       boolean isUpdate) {
-    commitPerThreadBytes(perThread);
-    if (!perThread.flushPending) {
-      if (isUpdate) {
-        flushPolicy.onUpdate(this, perThread);
-      } else {
-        flushPolicy.onInsert(this, perThread);
+    try {
+      commitPerThreadBytes(perThread);
+      if (!perThread.flushPending) {
+        if (isUpdate) {
+          flushPolicy.onUpdate(this, perThread);
+        } else {
+          flushPolicy.onInsert(this, perThread);
+        }
+        if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
+          // Safety check to prevent a single DWPT exceeding its RAM limit. This
+          // is super important since we can not address more than 2048 MB per DWPT
+          setFlushPending(perThread);
+        }
       }
-      if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
-        // Safety check to prevent a single DWPT exceeding its RAM limit. This
-        // is super important since we can not address more than 2048 MB per DWPT
-        setFlushPending(perThread);
-        if (fullFlush) {
-          DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread);
-          assert toBlock != null;
-          blockedFlushes.add(toBlock);
+      final DocumentsWriterPerThread flushingDWPT;
+      if (fullFlush) {
+        if (perThread.flushPending) {
+          checkoutAndBlock(perThread);
+          flushingDWPT = nextPendingFlush();
+        } else {
+          flushingDWPT = null;
         }
+      } else {
+       flushingDWPT = tryCheckoutForFlush(perThread);
       }
+      return flushingDWPT;
+    } finally {
+      stallControl.updateStalled(this);
+      assert assertMemory();
     }
-    final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread);
-    healthiness.updateStalled(this);
-    return flushingDWPT;
+    
+    
   }
 
   synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
     assert flushingWriters.containsKey(dwpt);
     try {
-      numFlushing--;
       Long bytes = flushingWriters.remove(dwpt);
       flushBytes -= bytes.longValue();
       perThreadPool.recycle(dwpt);
-      healthiness.updateStalled(this);
+      stallControl.updateStalled(this);
+      assert assertMemory();
     } finally {
       notifyAll();
     }
   }
   
-  public synchronized boolean anyFlushing() {
-    return numFlushing != 0;
-  }
-  
   public synchronized void waitForFlush() {
-    if (numFlushing != 0) {
+    while (flushingWriters.size() != 0) {
       try {
         this.wait();
       } catch (InterruptedException e) {
@@ -173,32 +202,51 @@ public final class DocumentsWriterFlushC
       flushBytes += bytes;
       activeBytes -= bytes;
       numPending++; // write access synced
+      assert assertMemory();
     } // don't assert on numDocs since we could hit an abort excp. while selecting that dwpt for flushing
     
   }
 
   synchronized void doOnAbort(ThreadState state) {
-    if (state.flushPending) {
-      flushBytes -= state.bytesUsed;
-    } else {
-      activeBytes -= state.bytesUsed;
+    try {
+      if (state.flushPending) {
+        flushBytes -= state.bytesUsed;
+      } else {
+        activeBytes -= state.bytesUsed;
+      }
+      assert assertMemory();
+      // Take it out of the loop this DWPT is stale
+      perThreadPool.replaceForFlush(state, closed);
+    }finally {
+      stallControl.updateStalled(this);
     }
-    // Take it out of the loop this DWPT is stale
-    perThreadPool.replaceForFlush(state, closed);
-    healthiness.updateStalled(this);
   }
 
   synchronized DocumentsWriterPerThread tryCheckoutForFlush(
       ThreadState perThread) {
-    if (fullFlush) {
-      return null;
+   return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
+  }
+  
+  private void checkoutAndBlock(ThreadState perThread) {
+    perThread.lock();
+    try {
+      assert perThread.flushPending : "can not block non-pending threadstate";
+      assert fullFlush : "can not block if fullFlush == false";
+      final DocumentsWriterPerThread dwpt;
+      final long bytes = perThread.bytesUsed;
+      dwpt = perThreadPool.replaceForFlush(perThread, closed);
+      numPending--;
+      blockedFlushes.add(new BlockedFlush(dwpt, bytes));
+    }finally {
+      perThread.unlock();
     }
-    return internalTryCheckOutForFlush(perThread);
   }
 
   private DocumentsWriterPerThread internalTryCheckOutForFlush(
       ThreadState perThread) {
-    if (perThread.flushPending) {
+    assert Thread.holdsLock(this);
+    assert perThread.flushPending;
+    try {
       // We are pending so all memory is already moved to flushBytes
       if (perThread.tryLock()) {
         try {
@@ -212,15 +260,16 @@ public final class DocumentsWriterFlushC
             // Record the flushing DWPT to reduce flushBytes in doAfterFlush
             flushingWriters.put(dwpt, Long.valueOf(bytes));
             numPending--; // write access synced
-            numFlushing++;
             return dwpt;
           }
         } finally {
           perThread.unlock();
         }
       }
+      return null;
+    } finally {
+      stallControl.updateStalled(this);
     }
-    return null;
   }
 
   @Override
@@ -231,12 +280,13 @@ public final class DocumentsWriterFlushC
 
   DocumentsWriterPerThread nextPendingFlush() {
     synchronized (this) {
-      DocumentsWriterPerThread poll = flushQueue.poll();
-      if (poll != null) {
+      final DocumentsWriterPerThread poll;
+      if ((poll = flushQueue.poll()) != null) {
+        stallControl.updateStalled(this);
         return poll;
-      }  
+      }
     }
-    if (numPending > 0) {
+    if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
       final Iterator<ThreadState> allActiveThreads = perThreadPool
           .getActivePerThreadsIterator();
       while (allActiveThreads.hasNext() && numPending > 0) {
@@ -276,8 +326,8 @@ public final class DocumentsWriterFlushC
     return documentsWriter.deleteQueue.numGlobalTermDeletes();
   }
 
-  int numFlushingDWPT() {
-    return numFlushing;
+  synchronized int numFlushingDWPT() {
+    return flushingWriters.size();
   }
   
   public boolean doApplyAllDeletes() {	
@@ -289,7 +339,7 @@ public final class DocumentsWriterFlushC
   }
   
   int numActiveDWPT() {
-    return this.perThreadPool.getMaxThreadStates();
+    return this.perThreadPool.getActiveThreadState();
   }
   
   void markForFullFlush() {
@@ -331,11 +381,11 @@ public final class DocumentsWriterFlushC
             if (!next.flushPending) {
               setFlushPending(next);
             }
+            final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
+            assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+            assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+            toFlush.add(flushingDWPT);
           }
-          final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
-          assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
-          assert dwpt == flushingDWPT : "flushControl returned different DWPT";
-          toFlush.add(flushingDWPT);
         } else {
           // get the new delete queue from DW
           next.perThread.initialize();
@@ -345,31 +395,54 @@ public final class DocumentsWriterFlushC
       }
     }
     synchronized (this) {
-      assert assertBlockedFlushes(flushingQueue);
-      flushQueue.addAll(blockedFlushes);
-      blockedFlushes.clear();
+      /* make sure we move all DWPT that are where concurrently marked as
+       * pending and moved to blocked are moved over to the flushQueue. There is
+       * a chance that this happens since we marking DWPT for full flush without
+       * blocking indexing.*/
+      pruneBlockedQueue(flushingQueue);   
+      assert assertBlockedFlushes(documentsWriter.deleteQueue);
       flushQueue.addAll(toFlush);
+      stallControl.updateStalled(this);
+    }
+  }
+  
+  /**
+   * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue. 
+   */
+  private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) {
+    Iterator<BlockedFlush> iterator = blockedFlushes.iterator();
+    while (iterator.hasNext()) {
+      BlockedFlush blockedFlush = iterator.next();
+      if (blockedFlush.dwpt.deleteQueue == flushingQueue) {
+        iterator.remove();
+        assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing";
+        // Record the flushing DWPT to reduce flushBytes in doAfterFlush
+        flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+        // don't decr pending here - its already done when DWPT is blocked
+        flushQueue.add(blockedFlush.dwpt);
+      }
     }
   }
   
   synchronized void finishFullFlush() {
     assert fullFlush;
     assert flushQueue.isEmpty();
+    assert flushingWriters.isEmpty();
     try {
       if (!blockedFlushes.isEmpty()) {
         assert assertBlockedFlushes(documentsWriter.deleteQueue);
-        flushQueue.addAll(blockedFlushes);
-        blockedFlushes.clear();
+        pruneBlockedQueue(documentsWriter.deleteQueue);
+        assert blockedFlushes.isEmpty();
       }
     } finally {
       fullFlush = false;
+      stallControl.updateStalled(this);
     }
   }
   
   boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
-    Queue<DocumentsWriterPerThread> flushes = this.blockedFlushes;
-    for (DocumentsWriterPerThread documentsWriterPerThread : flushes) {
-      assert documentsWriterPerThread.deleteQueue == flushingQueue;
+    for (BlockedFlush blockedFlush : blockedFlushes) {
+      assert blockedFlush.dwpt.deleteQueue == flushingQueue;
     }
     return true;
   }
@@ -379,18 +452,65 @@ public final class DocumentsWriterFlushC
       for (DocumentsWriterPerThread dwpt : flushQueue) {
         doAfterFlush(dwpt);
       }
-      for (DocumentsWriterPerThread dwpt : blockedFlushes) {
-        doAfterFlush(dwpt);
+      for (BlockedFlush blockedFlush : blockedFlushes) {
+        flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+        doAfterFlush(blockedFlush.dwpt);
       }
-      
     } finally {
       fullFlush = false;
       flushQueue.clear();
       blockedFlushes.clear();
+      stallControl.updateStalled(this);
     }
   }
   
-  synchronized boolean isFullFlush() {
+  /**
+   * Returns <code>true</code> if a full flush is currently running
+   */
+  synchronized boolean isFullFlush() { // used by assert
     return fullFlush;
   }
+
+  /**
+   * Returns the number of flushes that are already checked out but not yet
+   * actively flushing
+   */
+  synchronized int numQueuedFlushes() {
+    return flushQueue.size();
+  }
+
+  /**
+   * Returns the number of flushes that are checked out but not yet available
+   * for flushing. This only applies during a full flush if a DWPT needs
+   * flushing but must not be flushed until the full flush has finished.
+   */
+  synchronized int numBlockedFlushes() {
+    return blockedFlushes.size();
+  }
+  
+  private static class BlockedFlush {
+    final DocumentsWriterPerThread dwpt;
+    final long bytes;
+    BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) {
+      super();
+      this.dwpt = dwpt;
+      this.bytes = bytes;
+    }
+  }
+
+  /**
+   * This method will block if too many DWPT are currently flushing and no
+   * checked out DWPT are available
+   */
+  void waitIfStalled() {
+      stallControl.waitIfStalled();
+  }
+
+  /**
+   * Returns <code>true</code> iff stalled
+   */
+  boolean anyStalledThreads() {
+    return stallControl.anyStalledThreads();
+  }
+ 
 }
\ No newline at end of file

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 17 06:58:39 2011
@@ -165,6 +165,13 @@ public abstract class DocumentsWriterPer
   public int getMaxThreadStates() {
     return perThreads.length;
   }
+  
+  /**
+   * Returns the active number of {@link ThreadState} instances.
+   */
+  public int getActiveThreadState() {
+    return numThreadStatesActive;
+  }
 
   /**
    * Returns a new {@link ThreadState} iff any new state is available otherwise

Copied: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (from r1102572, lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java)
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?p2=lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java&p1=lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java&r1=1102572&r2=1104026&rev=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Tue May 17 06:58:39 2011
@@ -36,8 +36,7 @@ import org.apache.lucene.index.Documents
  * continue indexing.
  */
 //TODO: rename this to DocumentsWriterStallControl (or something like that)?
-final class Healthiness {
-
+final class DocumentsWriterStallControl {
   @SuppressWarnings("serial")
   private static final class Sync extends AbstractQueuedSynchronizer {
     volatile boolean hasBlockedThreads = false; // only with assert
@@ -96,13 +95,14 @@ final class Healthiness {
    * <code>true</code> iff the number of flushing
    * {@link DocumentsWriterPerThread} is greater than the number of active
    * {@link DocumentsWriterPerThread}. Otherwise it will reset the
-   * {@link Healthiness} to healthy and release all threads waiting on
+   * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on
    * {@link #waitIfStalled()}
    */
   void updateStalled(DocumentsWriterFlushControl flushControl) {
     do {
-      // if we have more flushing DWPT than numActiveDWPT we stall!
-      while (flushControl.numActiveDWPT() < flushControl.numFlushingDWPT()) {
+      // if we have more flushing / blocked DWPT than numActiveDWPT we stall!
+      // don't stall if we have queued flushes - threads should be hijacked instead
+      while (flushControl.netBytes() > flushControl.stallLimitBytes()) {
         if (sync.trySetStalled()) {
           assert wasStalled = true;
           return;
@@ -114,8 +114,8 @@ final class Healthiness {
   void waitIfStalled() {
     sync.acquireShared(0);
   }
-
-  boolean hasBlocked() {
+  
+  boolean hasBlocked() { // for tests
     return sync.hasBlockedThreads;
   }
 }
\ No newline at end of file

Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Tue May 17 06:58:39 2011
@@ -30,7 +30,6 @@ import org.apache.lucene.store.LockObtai
 import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.LineFileDocs;
 import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.ThrottledIndexOutput;
 import org.junit.Before;
 
 public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
@@ -105,7 +104,7 @@ public class TestFlushByRamOrCountsPolic
       assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
     }
     if (ensureNotStalled) {
-      assertFalse(docsWriter.healthiness.wasStalled);
+      assertFalse(docsWriter.flushControl.stallControl.wasStalled);
     }
     writer.close();
     assertEquals(0, flushControl.activeBytes());
@@ -216,15 +215,15 @@ public class TestFlushByRamOrCountsPolic
     assertEquals(numDocumentsToIndex, r.numDocs());
     assertEquals(numDocumentsToIndex, r.maxDoc());
     if (!flushPolicy.flushOnRAM()) {
-      assertFalse("never stall if we don't flush on RAM", docsWriter.healthiness.wasStalled);
-      assertFalse("never block if we don't flush on RAM", docsWriter.healthiness.hasBlocked());
+      assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled);
+      assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
     }
     r.close();
     writer.close();
     dir.close();
   }
 
-  public void testHealthyness() throws InterruptedException,
+  public void testStallControl() throws InterruptedException,
       CorruptIndexException, LockObtainFailedException, IOException {
 
     int[] numThreads = new int[] { 4 + random.nextInt(8), 1 };
@@ -240,7 +239,7 @@ public class TestFlushByRamOrCountsPolic
       iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
       FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
       iwc.setFlushPolicy(flushPolicy);
-
+      
       DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
           numThreads[i]== 1 ? 1 : 2);
       iwc.setIndexerThreadPool(threadPool);
@@ -265,11 +264,11 @@ public class TestFlushByRamOrCountsPolic
       assertEquals(numDocumentsToIndex, writer.maxDoc());
       if (numThreads[i] == 1) {
         assertFalse(
-            "single thread must not stall",
-            docsWriter.healthiness.wasStalled);
-        assertFalse(
             "single thread must not block numThreads: " + numThreads[i],
-            docsWriter.healthiness.hasBlocked());
+            docsWriter.flushControl.stallControl.hasBlocked());
+      }
+      if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
+        assertTrue(docsWriter.flushControl.stallControl.wasStalled);
       }
       assertActiveBytesAfter(flushControl);
       writer.close(true);