You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/10/05 20:19:49 UTC
svn commit: r1394704 - in /lucene/dev/trunk/lucene:
core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
test-framework/src/java/org/apache/lucene/store/
Author: mikemccand
Date: Fri Oct 5 18:19:48 2012
New Revision: 1394704
URL: http://svn.apache.org/viewvc?rev=1394704&view=rev
Log:
work around starvation issue where multiple indexing threads flush segments faster than the single thread can publish them
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Oct 5 18:19:48 2012
@@ -432,7 +432,20 @@ final class DocumentsWriter {
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
- ticketQueue.tryPurge(this);
+ if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadState()) {
+ // This means there is a backlog: the one
+ // thread in innerPurge can't keep up with all
+ // other threads flushing segments. In this case
+ // we forcefully stall the producers. But really
+ // this means we have a concurrency issue
+ // (TestBagOfPostings can provoke this):
+ // publishing a flush segment is too heavy today
+ // (it builds CFS, writes .si, etc.) ... we need
+ // to make those ops concurrent too:
+ ticketQueue.forcePurge(this);
+ } else {
+ ticketQueue.tryPurge(this);
+ }
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
@@ -496,7 +509,7 @@ final class DocumentsWriter {
final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
if (infoStream.isEnabled("DW")) {
- infoStream.message("DW", Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);
+ infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes);
}
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Fri Oct 5 18:19:48 2012
@@ -41,8 +41,7 @@ class DocumentsWriterFlushQueue {
// a window for #anyChanges to fail
boolean success = false;
try {
- queue
- .add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
+ queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
success = true;
} finally {
if (!success) {
@@ -111,7 +110,7 @@ class DocumentsWriterFlushQueue {
if (canPublish) {
try {
/*
- * if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
+ * if we block on publish -> lock IW -> lock BufferedDeletes we don't block
* concurrent segment flushes just because they want to append to the queue.
* the downside is that we need to force a purge on fullFlush since ther could
* be a ticket still in the queue.
@@ -119,7 +118,7 @@ class DocumentsWriterFlushQueue {
head.publish(writer);
} finally {
synchronized (this) {
- // finally remove the publised ticket from the queue
+ // finally remove the published ticket from the queue
final FlushTicket poll = queue.poll();
ticketCount.decrementAndGet();
assert poll == head;
@@ -152,6 +151,10 @@ class DocumentsWriterFlushQueue {
}
}
+ public int getTicketCount() {
+ return ticketCount.get();
+ }
+
synchronized void clear() {
queue.clear();
ticketCount.set(0);
@@ -186,7 +189,7 @@ class DocumentsWriterFlushQueue {
return true;
}
}
-
+
static final class SegmentFlushTicket extends FlushTicket {
private FlushedSegment segment;
private boolean failed = false;
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java Fri Oct 5 18:19:48 2012
@@ -43,8 +43,10 @@ public class TestBagOfPostings extends L
List<String> postingsList = new ArrayList<String>();
int numTerms = atLeast(300);
final int maxTermsPerDoc = _TestUtil.nextInt(random(), 10, 20);
- //System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
- //System.out.println("numTerms=" + numTerms);
+ if (VERBOSE) {
+ System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
+ System.out.println("numTerms=" + numTerms);
+ }
for (int i = 0; i < numTerms; i++) {
String term = Integer.toString(i);
for (int j = 0; j < i; j++) {
@@ -59,11 +61,14 @@ public class TestBagOfPostings extends L
final RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
int threadCount = _TestUtil.nextInt(random(), 1, 5);
- //System.out.println("threadCount=" + threadCount);
+ if (VERBOSE) {
+ System.out.println("config: " + iw.w.getConfig());
+ System.out.println("threadCount=" + threadCount);
+ }
Thread[] threads = new Thread[threadCount];
final CountDownLatch startingGun = new CountDownLatch(1);
-
+
for(int threadID=0;threadID<threadCount;threadID++) {
threads[threadID] = new Thread() {
@Override
Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java Fri Oct 5 18:19:48 2012
@@ -508,8 +508,14 @@ public class MockDirectoryWrapper extend
final IndexInput ii;
int randomInt = randomState.nextInt(500);
if (randomInt == 0) {
+ if (LuceneTestCase.VERBOSE) {
+ System.out.println("MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
+ }
ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
} else if (randomInt == 1) {
+ if (LuceneTestCase.VERBOSE) {
+ System.out.println("MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
+ }
ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
} else {
ii = new MockIndexInputWrapper(this, name, delegateInput);
@@ -660,7 +666,6 @@ public class MockDirectoryWrapper extend
endFiles = endSet.toArray(new String[0]);
if (!Arrays.equals(startFiles, endFiles)) {
- StringBuilder sb = new StringBuilder();
List<String> removed = new ArrayList<String>();
for(String fileName : startFiles) {
if (!endSet.contains(fileName)) {