You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/11/16 14:05:42 UTC

svn commit: r1410326 - /lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java

Author: simonw
Date: Fri Nov 16 13:05:41 2012
New Revision: 1410326

URL: http://svn.apache.org/viewvc?rev=1410326&view=rev
Log:
LUCENE-4561: count in-flight threads for asserting memory

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1410326&r1=1410325&r2=1410326&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Fri Nov 16 13:05:41 2012
@@ -46,6 +46,7 @@ final class DocumentsWriterFlushControl 
   private long activeBytes = 0;
   private long flushBytes = 0;
   private volatile int numPending = 0;
+  private int numDocsSinceStalled = 0; // only with assert
   final AtomicBoolean flushDeletes = new AtomicBoolean(false);
   private boolean fullFlush = false;
   private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
@@ -104,8 +105,8 @@ final class DocumentsWriterFlushControl 
       // 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit
       // (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed
       // all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
-      // (perThreadPool.getActiveThreadState() * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
-      final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (perThreadPool.getActiveThreadState() * peakDelta);
+      // (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
+      final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
       // the expected ram consumption is an upper bound at this point and not really the expected consumption
       if (peakDelta < (ramBufferBytes >> 1)) {
         /*
@@ -183,10 +184,26 @@ final class DocumentsWriterFlushControl 
       }
       return flushingDWPT;
     } finally {
-      updateStallState();
-      assert assertMemory();
+      boolean stalled = updateStallState();
+      assert assertNumDocsSinceStalled(stalled) && assertMemory();
     }
   }
+  
+  private boolean assertNumDocsSinceStalled(boolean stalled) {
+    /*
+     *  updates the number of documents "finished" while we are in a stalled state.
+     *  this is important for asserting memory upper bounds since it corresponds 
+     *  to the number of threads that are in-flight and crossed the stall control
+     *  check before we actually stalled.
+     *  see #assertMemory()
+     */
+    if (stalled) { 
+      numDocsSinceStalled++;
+    } else {
+      numDocsSinceStalled = 0;
+    }
+    return true;
+  }
 
   synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
     assert flushingWriters.containsKey(dwpt);
@@ -204,7 +221,7 @@ final class DocumentsWriterFlushControl 
     }
   }
   
-  private final void updateStallState() {
+  private final boolean updateStallState() {
     
     assert Thread.holdsLock(this);
     final long limit = stallLimitBytes();
@@ -219,6 +236,7 @@ final class DocumentsWriterFlushControl 
                           (activeBytes < limit) &&
                           !closed;
     stallControl.updateStalled(stall);
+    return stall;
   }
   
   public synchronized void waitForFlush() {