You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/11/16 14:09:03 UTC
svn commit: r1410331 - in /lucene/dev/branches/branch_4x: ./ lucene/
lucene/core/
lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
Author: simonw
Date: Fri Nov 16 13:09:03 2012
New Revision: 1410331
URL: http://svn.apache.org/viewvc?rev=1410331&view=rev
Log:
LUCENE-4561: count in-flight threads for asserting memory
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/lucene/ (props changed)
lucene/dev/branches/branch_4x/lucene/core/ (props changed)
lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1410331&r1=1410330&r2=1410331&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Fri Nov 16 13:09:03 2012
@@ -46,6 +46,7 @@ final class DocumentsWriterFlushControl
private long activeBytes = 0;
private long flushBytes = 0;
private volatile int numPending = 0;
+ private int numDocsSinceStalled = 0; // only with assert
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
private boolean fullFlush = false;
private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
@@ -104,8 +105,8 @@ final class DocumentsWriterFlushControl
// 2 * ramBufferBytes -> before we stall we need to cross the 2xRAM Buffer border this is still a valid limit
// (numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) -> those are the total number of DWPT that are not active but not yet fully fluhsed
// all of them could theoretically be taken out of the loop once they crossed the RAM buffer and the last document was the peak delta
- // (perThreadPool.getActiveThreadState() * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
- final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (perThreadPool.getActiveThreadState() * peakDelta);
+ // (numDocsSinceStalled * peakDelta) -> at any given time there could be n threads in flight that crossed the stall control before we reached the limit and each of them could hold a peak document
+ final long expected = (2 * (ramBufferBytes)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta) + (numDocsSinceStalled * peakDelta);
// the expected ram consumption is an upper bound at this point and not really the expected consumption
if (peakDelta < (ramBufferBytes >> 1)) {
/*
@@ -183,10 +184,26 @@ final class DocumentsWriterFlushControl
}
return flushingDWPT;
} finally {
- updateStallState();
- assert assertMemory();
+ boolean stalled = updateStallState();
+ assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
+
+ private boolean assertNumDocsSinceStalled(boolean stalled) {
+ /*
+ * updates the number of documents "finished" while we are in a stalled state.
+ * this is important for asserting memory upper bounds since it corresponds
+ * to the number of threads that are in-flight and crossed the stall control
+ * check before we actually stalled.
+ * see #assertMemory()
+ */
+ if (stalled) {
+ numDocsSinceStalled++;
+ } else {
+ numDocsSinceStalled = 0;
+ }
+ return true;
+ }
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
assert flushingWriters.containsKey(dwpt);
@@ -204,7 +221,7 @@ final class DocumentsWriterFlushControl
}
}
- private final void updateStallState() {
+ private final boolean updateStallState() {
assert Thread.holdsLock(this);
final long limit = stallLimitBytes();
@@ -219,6 +236,7 @@ final class DocumentsWriterFlushControl
(activeBytes < limit) &&
!closed;
stallControl.updateStalled(stall);
+ return stall;
}
public synchronized void waitForFlush() {