You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/10 18:26:06 UTC

svn commit: r1348621 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/DocumentsWriterStallControl.java test/org/apache/lucene/index/TestDocumentsWriterStallControl.java

Author: simonw
Date: Sun Jun 10 16:26:06 2012
New Revision: 1348621

URL: http://svn.apache.org/viewvc?rev=1348621&view=rev
Log:
LUCENE-4116: fix concurrency test for DWPTStallControl

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?rev=1348621&r1=1348620&r2=1348621&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Sun Jun 10 16:26:06 2012
@@ -39,7 +39,6 @@ import org.apache.lucene.util.ThreadInte
 final class DocumentsWriterStallControl {
   @SuppressWarnings("serial")
   private static final class Sync extends AbstractQueuedSynchronizer {
-    volatile boolean hasBlockedThreads = false; // only with assert
 
     Sync() {
       setState(0);
@@ -67,15 +66,10 @@ final class DocumentsWriterStallControl 
 
     @Override
     public int tryAcquireShared(int acquires) {
-      assert maybeSetHasBlocked(getState());
       return getState() == 0 ? 1 : -1;
     }
 
-    // only used for testing
-    private boolean maybeSetHasBlocked(int state) {
-      hasBlockedThreads |= getState() != 0;
-      return true;
-    }
+   
 
     @Override
     public boolean tryReleaseShared(int newState) {
@@ -130,7 +124,7 @@ final class DocumentsWriterStallControl 
   }
   
   boolean hasBlocked() { // for tests
-    return sync.hasBlockedThreads;
+    return sync.hasQueuedThreads();
   }
   
   static interface MemoryController {
@@ -138,4 +132,12 @@ final class DocumentsWriterStallControl 
     long flushBytes();
     long stallLimitBytes();
   }
+
+  public boolean isHealthy() {
+    return sync.isHealthy();
+  }
+  
+  public boolean isThreadQueued(Thread t) {
+    return sync.isQueued(t);
+  }
 }

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java?rev=1348621&r1=1348620&r2=1348621&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java Sun Jun 10 16:26:06 2012
@@ -127,22 +127,19 @@ public class TestDocumentsWriterStallCon
     int numStallers = atLeast(1);
     int numReleasers = atLeast(1);
     int numWaiters = atLeast(1);
-    
-    final CountDownLatch[] latches = new CountDownLatch[] {
-        new CountDownLatch(numStallers + numReleasers), new CountDownLatch(1),
-        new CountDownLatch(numWaiters)};
+    final Synchonizer sync = new Synchonizer(numStallers + numReleasers, numStallers + numReleasers+numWaiters);
     Thread[] threads = new Thread[numReleasers + numStallers + numWaiters];
     List<Throwable> exceptions =  Collections.synchronizedList(new ArrayList<Throwable>());
     for (int i = 0; i < numReleasers; i++) {
-      threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, exceptions);
+      threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, exceptions);
     }
     for (int i = numReleasers; i < numReleasers + numStallers; i++) {
-      threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, exceptions);
+      threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, exceptions);
       
     }
     for (int i = numReleasers + numStallers; i < numReleasers + numStallers
         + numWaiters; i++) {
-      threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions);
+      threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions);
       
     }
     
@@ -151,7 +148,7 @@ public class TestDocumentsWriterStallCon
     for (int i = 0; i < iters; i++) {
       if (checkPoint.get()) {
        
-        assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS));
+        assertTrue("timed out waiting for update threads - deadlock?", sync.updateJoin.await(10, TimeUnit.SECONDS));
         if (!exceptions.isEmpty()) {
           for (Throwable throwable : exceptions) {
             throwable.printStackTrace();
@@ -159,27 +156,38 @@ public class TestDocumentsWriterStallCon
           fail("got exceptions in threads");
         }
         
-        if (!ctrl.anyStalledThreads()) {
-          assertTrue(
-              "control claims no stalled threads but waiter seems to be blocked",
-              latches[2].await(10, TimeUnit.SECONDS));
-        }
-        checkPoint.set(false);
+        if (ctrl.hasBlocked() && ctrl.isHealthy()) {
+          assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
+          
+           
+          }
         
-        latches[1].countDown();
+        checkPoint.set(false);
+        sync.waiter.countDown();
+        sync.leftCheckpoint.await();
       }
       assertFalse(checkPoint.get());
+      assertEquals(0, sync.waiter.getCount());
       if (random().nextInt(2) == 0) {
-        latches[0] = new CountDownLatch(numStallers + numReleasers);
-        latches[1] = new CountDownLatch(1);
-        latches[2] = new CountDownLatch(numWaiters);
+        sync.reset(numStallers + numReleasers, numStallers + numReleasers
+            + numWaiters);
         checkPoint.set(true);
       }
   
     }
+    if (!checkPoint.get()) {
+      sync.reset(numStallers + numReleasers, numStallers + numReleasers
+          + numWaiters);
+      checkPoint.set(true);
+    }
     
+    assertTrue(sync.updateJoin.await(10, TimeUnit.SECONDS));
+    assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
+    checkPoint.set(false);
     stop.set(true);
-    latches[1].countDown();
+    sync.waiter.countDown();
+    sync.leftCheckpoint.await();
+    
     
     for (int i = 0; i < threads.length; i++) {
       memCtrl.limit = 1000;
@@ -196,20 +204,45 @@ public class TestDocumentsWriterStallCon
     }
   }
   
+  private void assertState(int numReleasers, int numStallers, int numWaiters, Thread[] threads, DocumentsWriterStallControl ctrl) throws InterruptedException {
+    int millisToSleep = 100;
+    while (true) {
+      if (ctrl.hasBlocked() && ctrl.isHealthy()) {
+        for (int n = numReleasers + numStallers; n < numReleasers
+            + numStallers + numWaiters; n++) {
+          if (ctrl.isThreadQueued(threads[n])) {
+            if (millisToSleep < 60000) {
+              Thread.sleep(millisToSleep);
+              millisToSleep *=2;
+              break;
+            } else {
+              fail("control claims no stalled threads but waiter seems to be blocked ");
+            }
+          }
+        }
+        break;
+      } else {
+        break;
+      }
+    }
+    
+  }
+
   public static class Waiter extends Thread {
-    private CountDownLatch[] latches;
+    private Synchonizer sync;
     private DocumentsWriterStallControl ctrl;
     private AtomicBoolean checkPoint;
     private AtomicBoolean stop;
     private List<Throwable> exceptions;
     
     public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint,
-        DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
+        DocumentsWriterStallControl ctrl, Synchonizer sync,
         List<Throwable> exceptions) {
+      super("waiter");
       this.stop = stop;
       this.checkPoint = checkPoint;
       this.ctrl = ctrl;
-      this.latches = latches;
+      this.sync = sync;
       this.exceptions = exceptions;
     }
     
@@ -218,13 +251,10 @@ public class TestDocumentsWriterStallCon
         while (!stop.get()) {
           ctrl.waitIfStalled();
           if (checkPoint.get()) {
-            CountDownLatch join = latches[2];
-            CountDownLatch wait = latches[1];
-            join.countDown();
             try {
-              assertTrue(wait.await(10, TimeUnit.SECONDS));
+              assertTrue(sync.await());
             } catch (InterruptedException e) {
-              System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount());
+              System.out.println("[Waiter] got interrupted - wait count: " + sync.waiter.getCount());
               throw new ThreadInterruptedException(e);
             }
           }
@@ -238,7 +268,7 @@ public class TestDocumentsWriterStallCon
   
   public static class Updater extends Thread {
     
-    private CountDownLatch[] latches;
+    private Synchonizer sync;
     private DocumentsWriterStallControl ctrl;
     private AtomicBoolean checkPoint;
     private AtomicBoolean stop;
@@ -246,12 +276,13 @@ public class TestDocumentsWriterStallCon
     private List<Throwable> exceptions;
     
     public Updater(AtomicBoolean stop, AtomicBoolean checkPoint,
-        DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
+        DocumentsWriterStallControl ctrl, Synchonizer sync,
         boolean release, List<Throwable> exceptions) {
+      super("updater");
       this.stop = stop;
       this.checkPoint = checkPoint;
       this.ctrl = ctrl;
-      this.latches = latches;
+      this.sync = sync;
       this.release = release;
       this.exceptions = exceptions;
     }
@@ -268,22 +299,24 @@ public class TestDocumentsWriterStallCon
             ctrl.updateStalled(memCtrl);
           }
           if (checkPoint.get()) {
-            CountDownLatch join = latches[0];
-            CountDownLatch wait = latches[1];
-            join.countDown();
+            sync.updateJoin.countDown();
             try {
-              assertTrue(wait.await(10, TimeUnit.SECONDS));
+              assertTrue(sync.await());
             } catch (InterruptedException e) {
-              System.out.println("[Updater] got interrupted - wait count: " + wait.getCount());
+              System.out.println("[Updater] got interrupted - wait count: " + sync.waiter.getCount());
               throw new ThreadInterruptedException(e);
             }
+            sync.leftCheckpoint.countDown();
+          }
+          if (random().nextBoolean()) {
+            Thread.yield();
           }
-          Thread.yield();
         }
       } catch (Throwable e) {
         e.printStackTrace();
         exceptions.add(e);
       }
+      sync.updateJoin.countDown();
     }
     
   }
@@ -366,4 +399,25 @@ public class TestDocumentsWriterStallCon
     }
     
   }
+  
+  private static final class Synchonizer {
+    volatile CountDownLatch waiter;
+    volatile CountDownLatch updateJoin;
+    volatile CountDownLatch leftCheckpoint;
+    
+    public Synchonizer(int numUpdater, int numThreads) {
+      reset(numUpdater, numThreads);
+    }
+    
+    public void reset(int numUpdaters, int numThreads) {
+      this.waiter = new CountDownLatch(1);
+      this.updateJoin = new CountDownLatch(numUpdaters);
+      this.leftCheckpoint = new CountDownLatch(numUpdaters);
+    }
+    
+    public boolean await() throws InterruptedException {
+      return waiter.await(10, TimeUnit.SECONDS);
+    }
+    
+  }
 }