You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/10/05 20:19:49 UTC

svn commit: r1394704 - in /lucene/dev/trunk/lucene: core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/ test-framework/src/java/org/apache/lucene/store/

Author: mikemccand
Date: Fri Oct  5 18:19:48 2012
New Revision: 1394704

URL: http://svn.apache.org/viewvc?rev=1394704&view=rev
Log:
work around starvation issue where multiple indexing threads flush segments faster than the single thread can publish them

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java
    lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Oct  5 18:19:48 2012
@@ -432,7 +432,20 @@ final class DocumentsWriter {
          * Now we are done and try to flush the ticket queue if the head of the
          * queue has already finished the flush.
          */
-        ticketQueue.tryPurge(this);
+        if (ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadState()) {
+          // This means there is a backlog: the one
+          // thread in innerPurge can't keep up with all
+          // other threads flushing segments.  In this case
+          // we forcefully stall the producers.  But really
+          // this means we have a concurrency issue
+          // (TestBagOfPostings can provoke this):
+          // publishing a flush segment is too heavy today
+          // (it builds CFS, writes .si, etc.) ... we need
+          // to make those ops concurrent too:
+          ticketQueue.forcePurge(this);
+        } else {
+          ticketQueue.tryPurge(this);
+        }
       } finally {
         flushControl.doAfterFlush(flushingDWPT);
         flushingDWPT.checkAndResetHasAborted();
@@ -496,7 +509,7 @@ final class DocumentsWriter {
     final SegmentInfoPerCommit segInfo = indexWriter.prepareFlushedSegment(newSegment);
     final BufferedDeletes deletes = newSegment.segmentDeletes;
     if (infoStream.isEnabled("DW")) {
-      infoStream.message("DW", Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);  
+      infoStream.message("DW", "publishFlushedSegment seg-private deletes=" + deletes);  
     }
     FrozenBufferedDeletes packet = null;
     if (deletes != null && deletes.any()) {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Fri Oct  5 18:19:48 2012
@@ -41,8 +41,7 @@ class DocumentsWriterFlushQueue {
                    // a window for #anyChanges to fail
       boolean success = false;
       try {
-        queue
-            .add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
+        queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
         success = true;
       } finally {
         if (!success) {
@@ -111,7 +110,7 @@ class DocumentsWriterFlushQueue {
       if (canPublish) {
         try {
           /*
-           * if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
+           * if we block on publish -> lock IW -> lock BufferedDeletes we don't block
            * concurrent segment flushes just because they want to append to the queue.
            * the downside is that we need to force a purge on fullFlush since ther could
            * be a ticket still in the queue. 
@@ -119,7 +118,7 @@ class DocumentsWriterFlushQueue {
           head.publish(writer);
         } finally {
           synchronized (this) {
-            // finally remove the publised ticket from the queue
+            // finally remove the published ticket from the queue
             final FlushTicket poll = queue.poll();
             ticketCount.decrementAndGet();
             assert poll == head;
@@ -152,6 +151,10 @@ class DocumentsWriterFlushQueue {
     }
   }
 
+  public int getTicketCount() {
+    return ticketCount.get();
+  }
+
   synchronized void clear() {
     queue.clear();
     ticketCount.set(0);
@@ -186,7 +189,7 @@ class DocumentsWriterFlushQueue {
       return true;
     }
   }
-  
+
   static final class SegmentFlushTicket extends FlushTicket {
     private FlushedSegment segment;
     private boolean failed = false;

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestBagOfPostings.java Fri Oct  5 18:19:48 2012
@@ -43,8 +43,10 @@ public class TestBagOfPostings extends L
     List<String> postingsList = new ArrayList<String>();
     int numTerms = atLeast(300);
     final int maxTermsPerDoc = _TestUtil.nextInt(random(), 10, 20);
-    //System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
-    //System.out.println("numTerms=" + numTerms);
+    if (VERBOSE) {
+      System.out.println("maxTermsPerDoc=" + maxTermsPerDoc);
+      System.out.println("numTerms=" + numTerms);
+    }
     for (int i = 0; i < numTerms; i++) {
       String term = Integer.toString(i);
       for (int j = 0; j < i; j++) {
@@ -59,11 +61,14 @@ public class TestBagOfPostings extends L
     final RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
 
     int threadCount = _TestUtil.nextInt(random(), 1, 5);
-    //System.out.println("threadCount=" + threadCount);
+    if (VERBOSE) {
+      System.out.println("config: " + iw.w.getConfig());
+      System.out.println("threadCount=" + threadCount);
+    }
 
     Thread[] threads = new Thread[threadCount];
     final CountDownLatch startingGun = new CountDownLatch(1);
-    
+
     for(int threadID=0;threadID<threadCount;threadID++) {
       threads[threadID] = new Thread() {
           @Override

Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java?rev=1394704&r1=1394703&r2=1394704&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java Fri Oct  5 18:19:48 2012
@@ -508,8 +508,14 @@ public class MockDirectoryWrapper extend
     final IndexInput ii;
     int randomInt = randomState.nextInt(500);
     if (randomInt == 0) {
+      if (LuceneTestCase.VERBOSE) {
+        System.out.println("MockDirectoryWrapper: using SlowClosingMockIndexInputWrapper for file " + name);
+      }
       ii = new SlowClosingMockIndexInputWrapper(this, name, delegateInput);
     } else if (randomInt  == 1) { 
+      if (LuceneTestCase.VERBOSE) {
+        System.out.println("MockDirectoryWrapper: using SlowOpeningMockIndexInputWrapper for file " + name);
+      }
       ii = new SlowOpeningMockIndexInputWrapper(this, name, delegateInput);
     } else {
       ii = new MockIndexInputWrapper(this, name, delegateInput);
@@ -660,7 +666,6 @@ public class MockDirectoryWrapper extend
           endFiles = endSet.toArray(new String[0]);
 
           if (!Arrays.equals(startFiles, endFiles)) {
-            StringBuilder sb = new StringBuilder();
             List<String> removed = new ArrayList<String>();
             for(String fileName : startFiles) {
               if (!endSet.contains(fileName)) {