You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/05/17 08:58:39 UTC
svn commit: r1104026 - in /lucene/dev/trunk/lucene/src:
java/org/apache/lucene/index/ test/org/apache/lucene/index/
Author: simonw
Date: Tue May 17 06:58:39 2011
New Revision: 1104026
URL: http://svn.apache.org/viewvc?rev=1104026&view=rev
Log:
LUCENE-3090: DWFlushControl does not take active DWPT out of the loop on fullFlush
Added:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
- copied, changed from r1102572, lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
Removed:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java
Modified:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 17 06:58:39 2011
@@ -126,7 +126,6 @@ final class DocumentsWriter {
final DocumentsWriterPerThreadPool perThreadPool;
final FlushPolicy flushPolicy;
final DocumentsWriterFlushControl flushControl;
- final Healthiness healthiness;
DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldNumberBiMap globalFieldNumbers,
BufferedDeletesStream bufferedDeletesStream) throws IOException {
this.directory = directory;
@@ -142,10 +141,7 @@ final class DocumentsWriter {
flushPolicy = configuredPolicy;
}
flushPolicy.init(this);
-
- healthiness = new Healthiness();
- final long maxRamPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;
- flushControl = new DocumentsWriterFlushControl(this, healthiness, maxRamPerDWPT);
+ flushControl = new DocumentsWriterFlushControl(this, config );
}
synchronized void deleteQueries(final Query... queries) throws IOException {
@@ -283,31 +279,28 @@ final class DocumentsWriter {
ensureOpen();
boolean maybeMerge = false;
final boolean isUpdate = delTerm != null;
- if (healthiness.anyStalledThreads()) {
-
- // Help out flushing any pending DWPTs so we can un-stall:
+ if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
+ // Help out flushing any queued DWPTs so we can un-stall:
if (infoStream != null) {
- message("WARNING DocumentsWriter has stalled threads; will hijack this thread to flush pending segment(s)");
+ message("DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
}
-
- // Try pick up pending threads here if possible
- DocumentsWriterPerThread flushingDWPT;
- while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
- // Don't push the delete here since the update could fail!
- maybeMerge = doFlush(flushingDWPT);
- if (!healthiness.anyStalledThreads()) {
- break;
+ do {
+ // Try pick up pending threads here if possible
+ DocumentsWriterPerThread flushingDWPT;
+ while ((flushingDWPT = flushControl.nextPendingFlush()) != null) {
+ // Don't push the delete here since the update could fail!
+ maybeMerge |= doFlush(flushingDWPT);
}
- }
-
- if (infoStream != null && healthiness.anyStalledThreads()) {
- message("WARNING DocumentsWriter still has stalled threads; waiting");
- }
-
- healthiness.waitIfStalled(); // block if stalled
+
+ if (infoStream != null && flushControl.anyStalledThreads()) {
+ message("WARNING DocumentsWriter has stalled threads; waiting");
+ }
+
+ flushControl.waitIfStalled(); // block if stalled
+ } while (flushControl.numQueuedFlushes() != 0); // still queued DWPTs try help flushing
- if (infoStream != null && healthiness.anyStalledThreads()) {
- message("WARNING DocumentsWriter done waiting");
+ if (infoStream != null) {
+ message("continue indexing after helpling out flushing DocumentsWriter is healthy");
}
}
@@ -353,7 +346,6 @@ final class DocumentsWriter {
maybeMerge = true;
boolean success = false;
FlushTicket ticket = null;
-
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@@ -511,9 +503,7 @@ final class DocumentsWriter {
anythingFlushed |= doFlush(flushingDWPT);
}
// If a concurrent flush is still in flight wait for it
- while (flushControl.anyFlushing()) {
- flushControl.waitForFlush();
- }
+ flushControl.waitForFlush();
if (!anythingFlushed) { // apply deletes if we did not flush any document
synchronized (ticketQueue) {
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 17 06:58:39 2011
@@ -44,30 +44,32 @@ public final class DocumentsWriterFlushC
private long activeBytes = 0;
private long flushBytes = 0;
private volatile int numPending = 0;
- private volatile int numFlushing = 0;
final AtomicBoolean flushDeletes = new AtomicBoolean(false);
private boolean fullFlush = false;
- private Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
+ private final Queue<DocumentsWriterPerThread> flushQueue = new LinkedList<DocumentsWriterPerThread>();
// only for safety reasons if a DWPT is close to the RAM limit
- private Queue<DocumentsWriterPerThread> blockedFlushes = new LinkedList<DocumentsWriterPerThread>();
-
+ private final Queue<BlockedFlush> blockedFlushes = new LinkedList<BlockedFlush>();
+ double maxConfiguredRamBuffer = 0;
long peakActiveBytes = 0;// only with assert
long peakFlushBytes = 0;// only with assert
long peakNetBytes = 0;// only with assert
- private final Healthiness healthiness;
+ long peakDelta = 0; // only with assert
+ final DocumentsWriterStallControl stallControl;
private final DocumentsWriterPerThreadPool perThreadPool;
private final FlushPolicy flushPolicy;
private boolean closed = false;
private final HashMap<DocumentsWriterPerThread, Long> flushingWriters = new HashMap<DocumentsWriterPerThread, Long>();
private final DocumentsWriter documentsWriter;
+ private final IndexWriterConfig config;
DocumentsWriterFlushControl(DocumentsWriter documentsWriter,
- Healthiness healthiness, long hardMaxBytesPerDWPT) {
- this.healthiness = healthiness;
+ IndexWriterConfig config) {
+ this.stallControl = new DocumentsWriterStallControl();
this.perThreadPool = documentsWriter.perThreadPool;
this.flushPolicy = documentsWriter.flushPolicy;
- this.hardMaxBytesPerDWPT = hardMaxBytesPerDWPT;
+ this.hardMaxBytesPerDWPT = config.getRAMPerThreadHardLimitMB() * 1024 * 1024;;
+ this.config = config;
this.documentsWriter = documentsWriter;
}
@@ -82,6 +84,24 @@ public final class DocumentsWriterFlushC
public synchronized long netBytes() {
return flushBytes + activeBytes;
}
+
+ long stallLimitBytes() {
+ final double maxRamMB = config.getRAMBufferSizeMB();
+ return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
+ }
+
+ private boolean assertMemory() {
+ final double maxRamMB = config.getRAMBufferSizeMB();
+ if (maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+ // for this assert we must be tolerant to ram buffer changes!
+ maxConfiguredRamBuffer = Math.max(maxRamMB, maxConfiguredRamBuffer);
+ final long ram = flushBytes + activeBytes;
+ // take peakDelta into account - worst case is that all flushing, pending and blocked DWPT had maxMem and the last doc had the peakDelta
+ final long expected = (long)(2 * (maxConfiguredRamBuffer * 1024 * 1024)) + ((numPending + numFlushingDWPT() + numBlockedFlushes()) * peakDelta);
+ assert ram <= expected : "ram was " + ram + " expected: " + expected + " flush mem: " + flushBytes + " active: " + activeBytes ;
+ }
+ return true;
+ }
private void commitPerThreadBytes(ThreadState perThread) {
final long delta = perThread.perThread.bytesUsed()
@@ -105,53 +125,62 @@ public final class DocumentsWriterFlushC
peakActiveBytes = Math.max(peakActiveBytes, activeBytes);
peakFlushBytes = Math.max(peakFlushBytes, flushBytes);
peakNetBytes = Math.max(peakNetBytes, netBytes());
+ peakDelta = Math.max(peakDelta, delta);
+
return true;
}
synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
boolean isUpdate) {
- commitPerThreadBytes(perThread);
- if (!perThread.flushPending) {
- if (isUpdate) {
- flushPolicy.onUpdate(this, perThread);
- } else {
- flushPolicy.onInsert(this, perThread);
+ try {
+ commitPerThreadBytes(perThread);
+ if (!perThread.flushPending) {
+ if (isUpdate) {
+ flushPolicy.onUpdate(this, perThread);
+ } else {
+ flushPolicy.onInsert(this, perThread);
+ }
+ if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
+ // Safety check to prevent a single DWPT exceeding its RAM limit. This
+ // is super important since we can not address more than 2048 MB per DWPT
+ setFlushPending(perThread);
+ }
}
- if (!perThread.flushPending && perThread.bytesUsed > hardMaxBytesPerDWPT) {
- // Safety check to prevent a single DWPT exceeding its RAM limit. This
- // is super important since we can not address more than 2048 MB per DWPT
- setFlushPending(perThread);
- if (fullFlush) {
- DocumentsWriterPerThread toBlock = internalTryCheckOutForFlush(perThread);
- assert toBlock != null;
- blockedFlushes.add(toBlock);
+ final DocumentsWriterPerThread flushingDWPT;
+ if (fullFlush) {
+ if (perThread.flushPending) {
+ checkoutAndBlock(perThread);
+ flushingDWPT = nextPendingFlush();
+ } else {
+ flushingDWPT = null;
}
+ } else {
+ flushingDWPT = tryCheckoutForFlush(perThread);
}
+ return flushingDWPT;
+ } finally {
+ stallControl.updateStalled(this);
+ assert assertMemory();
}
- final DocumentsWriterPerThread flushingDWPT = tryCheckoutForFlush(perThread);
- healthiness.updateStalled(this);
- return flushingDWPT;
+
+
}
synchronized void doAfterFlush(DocumentsWriterPerThread dwpt) {
assert flushingWriters.containsKey(dwpt);
try {
- numFlushing--;
Long bytes = flushingWriters.remove(dwpt);
flushBytes -= bytes.longValue();
perThreadPool.recycle(dwpt);
- healthiness.updateStalled(this);
+ stallControl.updateStalled(this);
+ assert assertMemory();
} finally {
notifyAll();
}
}
- public synchronized boolean anyFlushing() {
- return numFlushing != 0;
- }
-
public synchronized void waitForFlush() {
- if (numFlushing != 0) {
+ while (flushingWriters.size() != 0) {
try {
this.wait();
} catch (InterruptedException e) {
@@ -173,32 +202,51 @@ public final class DocumentsWriterFlushC
flushBytes += bytes;
activeBytes -= bytes;
numPending++; // write access synced
+ assert assertMemory();
} // don't assert on numDocs since we could hit an abort excp. while selecting that dwpt for flushing
}
synchronized void doOnAbort(ThreadState state) {
- if (state.flushPending) {
- flushBytes -= state.bytesUsed;
- } else {
- activeBytes -= state.bytesUsed;
+ try {
+ if (state.flushPending) {
+ flushBytes -= state.bytesUsed;
+ } else {
+ activeBytes -= state.bytesUsed;
+ }
+ assert assertMemory();
+ // Take it out of the loop this DWPT is stale
+ perThreadPool.replaceForFlush(state, closed);
+ }finally {
+ stallControl.updateStalled(this);
}
- // Take it out of the loop this DWPT is stale
- perThreadPool.replaceForFlush(state, closed);
- healthiness.updateStalled(this);
}
synchronized DocumentsWriterPerThread tryCheckoutForFlush(
ThreadState perThread) {
- if (fullFlush) {
- return null;
+ return perThread.flushPending ? internalTryCheckOutForFlush(perThread) : null;
+ }
+
+ private void checkoutAndBlock(ThreadState perThread) {
+ perThread.lock();
+ try {
+ assert perThread.flushPending : "can not block non-pending threadstate";
+ assert fullFlush : "can not block if fullFlush == false";
+ final DocumentsWriterPerThread dwpt;
+ final long bytes = perThread.bytesUsed;
+ dwpt = perThreadPool.replaceForFlush(perThread, closed);
+ numPending--;
+ blockedFlushes.add(new BlockedFlush(dwpt, bytes));
+ }finally {
+ perThread.unlock();
}
- return internalTryCheckOutForFlush(perThread);
}
private DocumentsWriterPerThread internalTryCheckOutForFlush(
ThreadState perThread) {
- if (perThread.flushPending) {
+ assert Thread.holdsLock(this);
+ assert perThread.flushPending;
+ try {
// We are pending so all memory is already moved to flushBytes
if (perThread.tryLock()) {
try {
@@ -212,15 +260,16 @@ public final class DocumentsWriterFlushC
// Record the flushing DWPT to reduce flushBytes in doAfterFlush
flushingWriters.put(dwpt, Long.valueOf(bytes));
numPending--; // write access synced
- numFlushing++;
return dwpt;
}
} finally {
perThread.unlock();
}
}
+ return null;
+ } finally {
+ stallControl.updateStalled(this);
}
- return null;
}
@Override
@@ -231,12 +280,13 @@ public final class DocumentsWriterFlushC
DocumentsWriterPerThread nextPendingFlush() {
synchronized (this) {
- DocumentsWriterPerThread poll = flushQueue.poll();
- if (poll != null) {
+ final DocumentsWriterPerThread poll;
+ if ((poll = flushQueue.poll()) != null) {
+ stallControl.updateStalled(this);
return poll;
- }
+ }
}
- if (numPending > 0) {
+ if (numPending > 0 && !fullFlush) { // don't check if we are doing a full flush
final Iterator<ThreadState> allActiveThreads = perThreadPool
.getActivePerThreadsIterator();
while (allActiveThreads.hasNext() && numPending > 0) {
@@ -276,8 +326,8 @@ public final class DocumentsWriterFlushC
return documentsWriter.deleteQueue.numGlobalTermDeletes();
}
- int numFlushingDWPT() {
- return numFlushing;
+ synchronized int numFlushingDWPT() {
+ return flushingWriters.size();
}
public boolean doApplyAllDeletes() {
@@ -289,7 +339,7 @@ public final class DocumentsWriterFlushC
}
int numActiveDWPT() {
- return this.perThreadPool.getMaxThreadStates();
+ return this.perThreadPool.getActiveThreadState();
}
void markForFullFlush() {
@@ -331,11 +381,11 @@ public final class DocumentsWriterFlushC
if (!next.flushPending) {
setFlushPending(next);
}
+ final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
+ assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+ assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+ toFlush.add(flushingDWPT);
}
- final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
- assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
- assert dwpt == flushingDWPT : "flushControl returned different DWPT";
- toFlush.add(flushingDWPT);
} else {
// get the new delete queue from DW
next.perThread.initialize();
@@ -345,31 +395,54 @@ public final class DocumentsWriterFlushC
}
}
synchronized (this) {
- assert assertBlockedFlushes(flushingQueue);
- flushQueue.addAll(blockedFlushes);
- blockedFlushes.clear();
+ /* make sure we move all DWPT that are where concurrently marked as
+ * pending and moved to blocked are moved over to the flushQueue. There is
+ * a chance that this happens since we marking DWPT for full flush without
+ * blocking indexing.*/
+ pruneBlockedQueue(flushingQueue);
+ assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(toFlush);
+ stallControl.updateStalled(this);
+ }
+ }
+
+ /**
+ * Prunes the blockedQueue by removing all DWPT that are associated with the given flush queue.
+ */
+ private void pruneBlockedQueue(final DocumentsWriterDeleteQueue flushingQueue) {
+ Iterator<BlockedFlush> iterator = blockedFlushes.iterator();
+ while (iterator.hasNext()) {
+ BlockedFlush blockedFlush = iterator.next();
+ if (blockedFlush.dwpt.deleteQueue == flushingQueue) {
+ iterator.remove();
+ assert !flushingWriters.containsKey(blockedFlush.dwpt) : "DWPT is already flushing";
+ // Record the flushing DWPT to reduce flushBytes in doAfterFlush
+ flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+ // don't decr pending here - its already done when DWPT is blocked
+ flushQueue.add(blockedFlush.dwpt);
+ }
}
}
synchronized void finishFullFlush() {
assert fullFlush;
assert flushQueue.isEmpty();
+ assert flushingWriters.isEmpty();
try {
if (!blockedFlushes.isEmpty()) {
assert assertBlockedFlushes(documentsWriter.deleteQueue);
- flushQueue.addAll(blockedFlushes);
- blockedFlushes.clear();
+ pruneBlockedQueue(documentsWriter.deleteQueue);
+ assert blockedFlushes.isEmpty();
}
} finally {
fullFlush = false;
+ stallControl.updateStalled(this);
}
}
boolean assertBlockedFlushes(DocumentsWriterDeleteQueue flushingQueue) {
- Queue<DocumentsWriterPerThread> flushes = this.blockedFlushes;
- for (DocumentsWriterPerThread documentsWriterPerThread : flushes) {
- assert documentsWriterPerThread.deleteQueue == flushingQueue;
+ for (BlockedFlush blockedFlush : blockedFlushes) {
+ assert blockedFlush.dwpt.deleteQueue == flushingQueue;
}
return true;
}
@@ -379,18 +452,65 @@ public final class DocumentsWriterFlushC
for (DocumentsWriterPerThread dwpt : flushQueue) {
doAfterFlush(dwpt);
}
- for (DocumentsWriterPerThread dwpt : blockedFlushes) {
- doAfterFlush(dwpt);
+ for (BlockedFlush blockedFlush : blockedFlushes) {
+ flushingWriters.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
+ doAfterFlush(blockedFlush.dwpt);
}
-
} finally {
fullFlush = false;
flushQueue.clear();
blockedFlushes.clear();
+ stallControl.updateStalled(this);
}
}
- synchronized boolean isFullFlush() {
+ /**
+ * Returns <code>true</code> if a full flush is currently running
+ */
+ synchronized boolean isFullFlush() { // used by assert
return fullFlush;
}
+
+ /**
+ * Returns the number of flushes that are already checked out but not yet
+ * actively flushing
+ */
+ synchronized int numQueuedFlushes() {
+ return flushQueue.size();
+ }
+
+ /**
+ * Returns the number of flushes that are checked out but not yet available
+ * for flushing. This only applies during a full flush if a DWPT needs
+ * flushing but must not be flushed until the full flush has finished.
+ */
+ synchronized int numBlockedFlushes() {
+ return blockedFlushes.size();
+ }
+
+ private static class BlockedFlush {
+ final DocumentsWriterPerThread dwpt;
+ final long bytes;
+ BlockedFlush(DocumentsWriterPerThread dwpt, long bytes) {
+ super();
+ this.dwpt = dwpt;
+ this.bytes = bytes;
+ }
+ }
+
+ /**
+ * This method will block if too many DWPT are currently flushing and no
+ * checked out DWPT are available
+ */
+ void waitIfStalled() {
+ stallControl.waitIfStalled();
+ }
+
+ /**
+ * Returns <code>true</code> iff stalled
+ */
+ boolean anyStalledThreads() {
+ return stallControl.anyStalledThreads();
+ }
+
}
\ No newline at end of file
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 17 06:58:39 2011
@@ -165,6 +165,13 @@ public abstract class DocumentsWriterPer
public int getMaxThreadStates() {
return perThreads.length;
}
+
+ /**
+ * Returns the active number of {@link ThreadState} instances.
+ */
+ public int getActiveThreadState() {
+ return numThreadStatesActive;
+ }
/**
* Returns a new {@link ThreadState} iff any new state is available otherwise
Copied: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (from r1102572, lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java)
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?p2=lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java&p1=lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java&r1=1102572&r2=1104026&rev=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/Healthiness.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Tue May 17 06:58:39 2011
@@ -36,8 +36,7 @@ import org.apache.lucene.index.Documents
* continue indexing.
*/
//TODO: rename this to DocumentsWriterStallControl (or something like that)?
-final class Healthiness {
-
+final class DocumentsWriterStallControl {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
volatile boolean hasBlockedThreads = false; // only with assert
@@ -96,13 +95,14 @@ final class Healthiness {
* <code>true</code> iff the number of flushing
* {@link DocumentsWriterPerThread} is greater than the number of active
* {@link DocumentsWriterPerThread}. Otherwise it will reset the
- * {@link Healthiness} to healthy and release all threads waiting on
+ * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on
* {@link #waitIfStalled()}
*/
void updateStalled(DocumentsWriterFlushControl flushControl) {
do {
- // if we have more flushing DWPT than numActiveDWPT we stall!
- while (flushControl.numActiveDWPT() < flushControl.numFlushingDWPT()) {
+ // if we have more flushing / blocked DWPT than numActiveDWPT we stall!
+ // don't stall if we have queued flushes - threads should be hijacked instead
+ while (flushControl.netBytes() > flushControl.stallLimitBytes()) {
if (sync.trySetStalled()) {
assert wasStalled = true;
return;
@@ -114,8 +114,8 @@ final class Healthiness {
void waitIfStalled() {
sync.acquireShared(0);
}
-
- boolean hasBlocked() {
+
+ boolean hasBlocked() { // for tests
return sync.hasBlockedThreads;
}
}
\ No newline at end of file
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1104026&r1=1104025&r2=1104026&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Tue May 17 06:58:39 2011
@@ -30,7 +30,6 @@ import org.apache.lucene.store.LockObtai
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
-import org.apache.lucene.util.ThrottledIndexOutput;
import org.junit.Before;
public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
@@ -105,7 +104,7 @@ public class TestFlushByRamOrCountsPolic
assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
}
if (ensureNotStalled) {
- assertFalse(docsWriter.healthiness.wasStalled);
+ assertFalse(docsWriter.flushControl.stallControl.wasStalled);
}
writer.close();
assertEquals(0, flushControl.activeBytes());
@@ -216,15 +215,15 @@ public class TestFlushByRamOrCountsPolic
assertEquals(numDocumentsToIndex, r.numDocs());
assertEquals(numDocumentsToIndex, r.maxDoc());
if (!flushPolicy.flushOnRAM()) {
- assertFalse("never stall if we don't flush on RAM", docsWriter.healthiness.wasStalled);
- assertFalse("never block if we don't flush on RAM", docsWriter.healthiness.hasBlocked());
+ assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled);
+ assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
}
r.close();
writer.close();
dir.close();
}
- public void testHealthyness() throws InterruptedException,
+ public void testStallControl() throws InterruptedException,
CorruptIndexException, LockObtainFailedException, IOException {
int[] numThreads = new int[] { 4 + random.nextInt(8), 1 };
@@ -240,7 +239,7 @@ public class TestFlushByRamOrCountsPolic
iwc.setMaxBufferedDeleteTerms(IndexWriterConfig.DISABLE_AUTO_FLUSH);
FlushPolicy flushPolicy = new FlushByRamOrCountsPolicy();
iwc.setFlushPolicy(flushPolicy);
-
+
DocumentsWriterPerThreadPool threadPool = new ThreadAffinityDocumentsWriterThreadPool(
numThreads[i]== 1 ? 1 : 2);
iwc.setIndexerThreadPool(threadPool);
@@ -265,11 +264,11 @@ public class TestFlushByRamOrCountsPolic
assertEquals(numDocumentsToIndex, writer.maxDoc());
if (numThreads[i] == 1) {
assertFalse(
- "single thread must not stall",
- docsWriter.healthiness.wasStalled);
- assertFalse(
"single thread must not block numThreads: " + numThreads[i],
- docsWriter.healthiness.hasBlocked());
+ docsWriter.flushControl.stallControl.hasBlocked());
+ }
+ if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
+ assertTrue(docsWriter.flushControl.stallControl.wasStalled);
}
assertActiveBytesAfter(flushControl);
writer.close(true);