You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/10 18:26:06 UTC
svn commit: r1348621 - in /lucene/dev/trunk/lucene/core/src:
java/org/apache/lucene/index/DocumentsWriterStallControl.java
test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
Author: simonw
Date: Sun Jun 10 16:26:06 2012
New Revision: 1348621
URL: http://svn.apache.org/viewvc?rev=1348621&view=rev
Log:
LUCENE-4116: fix concurrency test for DWPTStallControl
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?rev=1348621&r1=1348620&r2=1348621&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Sun Jun 10 16:26:06 2012
@@ -39,7 +39,6 @@ import org.apache.lucene.util.ThreadInte
final class DocumentsWriterStallControl {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
- volatile boolean hasBlockedThreads = false; // only with assert
Sync() {
setState(0);
@@ -67,15 +66,10 @@ final class DocumentsWriterStallControl
@Override
public int tryAcquireShared(int acquires) {
- assert maybeSetHasBlocked(getState());
return getState() == 0 ? 1 : -1;
}
- // only used for testing
- private boolean maybeSetHasBlocked(int state) {
- hasBlockedThreads |= getState() != 0;
- return true;
- }
+
@Override
public boolean tryReleaseShared(int newState) {
@@ -130,7 +124,7 @@ final class DocumentsWriterStallControl
}
boolean hasBlocked() { // for tests
- return sync.hasBlockedThreads;
+ return sync.hasQueuedThreads();
}
static interface MemoryController {
@@ -138,4 +132,12 @@ final class DocumentsWriterStallControl
long flushBytes();
long stallLimitBytes();
}
+
+ public boolean isHealthy() {
+ return sync.isHealthy();
+ }
+
+ public boolean isThreadQueued(Thread t) {
+ return sync.isQueued(t);
+ }
}
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java?rev=1348621&r1=1348620&r2=1348621&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java Sun Jun 10 16:26:06 2012
@@ -127,22 +127,19 @@ public class TestDocumentsWriterStallCon
int numStallers = atLeast(1);
int numReleasers = atLeast(1);
int numWaiters = atLeast(1);
-
- final CountDownLatch[] latches = new CountDownLatch[] {
- new CountDownLatch(numStallers + numReleasers), new CountDownLatch(1),
- new CountDownLatch(numWaiters)};
+ final Synchonizer sync = new Synchonizer(numStallers + numReleasers, numStallers + numReleasers+numWaiters);
Thread[] threads = new Thread[numReleasers + numStallers + numWaiters];
List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
for (int i = 0; i < numReleasers; i++) {
- threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, exceptions);
+ threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, exceptions);
}
for (int i = numReleasers; i < numReleasers + numStallers; i++) {
- threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, exceptions);
+ threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, exceptions);
}
for (int i = numReleasers + numStallers; i < numReleasers + numStallers
+ numWaiters; i++) {
- threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions);
+ threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions);
}
@@ -151,7 +148,7 @@ public class TestDocumentsWriterStallCon
for (int i = 0; i < iters; i++) {
if (checkPoint.get()) {
- assertTrue("timed out waiting for update threads - deadlock?", latches[0].await(10, TimeUnit.SECONDS));
+ assertTrue("timed out waiting for update threads - deadlock?", sync.updateJoin.await(10, TimeUnit.SECONDS));
if (!exceptions.isEmpty()) {
for (Throwable throwable : exceptions) {
throwable.printStackTrace();
@@ -159,27 +156,38 @@ public class TestDocumentsWriterStallCon
fail("got exceptions in threads");
}
- if (!ctrl.anyStalledThreads()) {
- assertTrue(
- "control claims no stalled threads but waiter seems to be blocked",
- latches[2].await(10, TimeUnit.SECONDS));
- }
- checkPoint.set(false);
+ if (ctrl.hasBlocked() && ctrl.isHealthy()) {
+ assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
+
+
+ }
- latches[1].countDown();
+ checkPoint.set(false);
+ sync.waiter.countDown();
+ sync.leftCheckpoint.await();
}
assertFalse(checkPoint.get());
+ assertEquals(0, sync.waiter.getCount());
if (random().nextInt(2) == 0) {
- latches[0] = new CountDownLatch(numStallers + numReleasers);
- latches[1] = new CountDownLatch(1);
- latches[2] = new CountDownLatch(numWaiters);
+ sync.reset(numStallers + numReleasers, numStallers + numReleasers
+ + numWaiters);
checkPoint.set(true);
}
}
+ if (!checkPoint.get()) {
+ sync.reset(numStallers + numReleasers, numStallers + numReleasers
+ + numWaiters);
+ checkPoint.set(true);
+ }
+ assertTrue(sync.updateJoin.await(10, TimeUnit.SECONDS));
+ assertState(numReleasers, numStallers, numWaiters, threads, ctrl);
+ checkPoint.set(false);
stop.set(true);
- latches[1].countDown();
+ sync.waiter.countDown();
+ sync.leftCheckpoint.await();
+
for (int i = 0; i < threads.length; i++) {
memCtrl.limit = 1000;
@@ -196,20 +204,45 @@ public class TestDocumentsWriterStallCon
}
}
+ private void assertState(int numReleasers, int numStallers, int numWaiters, Thread[] threads, DocumentsWriterStallControl ctrl) throws InterruptedException {
+ int millisToSleep = 100;
+ while (true) {
+ if (ctrl.hasBlocked() && ctrl.isHealthy()) {
+ for (int n = numReleasers + numStallers; n < numReleasers
+ + numStallers + numWaiters; n++) {
+ if (ctrl.isThreadQueued(threads[n])) {
+ if (millisToSleep < 60000) {
+ Thread.sleep(millisToSleep);
+ millisToSleep *=2;
+ break;
+ } else {
+ fail("control claims no stalled threads but waiter seems to be blocked ");
+ }
+ }
+ }
+ break;
+ } else {
+ break;
+ }
+ }
+
+ }
+
public static class Waiter extends Thread {
- private CountDownLatch[] latches;
+ private Synchonizer sync;
private DocumentsWriterStallControl ctrl;
private AtomicBoolean checkPoint;
private AtomicBoolean stop;
private List<Throwable> exceptions;
public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint,
- DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
+ DocumentsWriterStallControl ctrl, Synchonizer sync,
List<Throwable> exceptions) {
+ super("waiter");
this.stop = stop;
this.checkPoint = checkPoint;
this.ctrl = ctrl;
- this.latches = latches;
+ this.sync = sync;
this.exceptions = exceptions;
}
@@ -218,13 +251,10 @@ public class TestDocumentsWriterStallCon
while (!stop.get()) {
ctrl.waitIfStalled();
if (checkPoint.get()) {
- CountDownLatch join = latches[2];
- CountDownLatch wait = latches[1];
- join.countDown();
try {
- assertTrue(wait.await(10, TimeUnit.SECONDS));
+ assertTrue(sync.await());
} catch (InterruptedException e) {
- System.out.println("[Waiter] got interrupted - wait count: " + wait.getCount());
+ System.out.println("[Waiter] got interrupted - wait count: " + sync.waiter.getCount());
throw new ThreadInterruptedException(e);
}
}
@@ -238,7 +268,7 @@ public class TestDocumentsWriterStallCon
public static class Updater extends Thread {
- private CountDownLatch[] latches;
+ private Synchonizer sync;
private DocumentsWriterStallControl ctrl;
private AtomicBoolean checkPoint;
private AtomicBoolean stop;
@@ -246,12 +276,13 @@ public class TestDocumentsWriterStallCon
private List<Throwable> exceptions;
public Updater(AtomicBoolean stop, AtomicBoolean checkPoint,
- DocumentsWriterStallControl ctrl, CountDownLatch[] latches,
+ DocumentsWriterStallControl ctrl, Synchonizer sync,
boolean release, List<Throwable> exceptions) {
+ super("updater");
this.stop = stop;
this.checkPoint = checkPoint;
this.ctrl = ctrl;
- this.latches = latches;
+ this.sync = sync;
this.release = release;
this.exceptions = exceptions;
}
@@ -268,22 +299,24 @@ public class TestDocumentsWriterStallCon
ctrl.updateStalled(memCtrl);
}
if (checkPoint.get()) {
- CountDownLatch join = latches[0];
- CountDownLatch wait = latches[1];
- join.countDown();
+ sync.updateJoin.countDown();
try {
- assertTrue(wait.await(10, TimeUnit.SECONDS));
+ assertTrue(sync.await());
} catch (InterruptedException e) {
- System.out.println("[Updater] got interrupted - wait count: " + wait.getCount());
+ System.out.println("[Updater] got interrupted - wait count: " + sync.waiter.getCount());
throw new ThreadInterruptedException(e);
}
+ sync.leftCheckpoint.countDown();
+ }
+ if (random().nextBoolean()) {
+ Thread.yield();
}
- Thread.yield();
}
} catch (Throwable e) {
e.printStackTrace();
exceptions.add(e);
}
+ sync.updateJoin.countDown();
}
}
@@ -366,4 +399,25 @@ public class TestDocumentsWriterStallCon
}
}
+
+ private static final class Synchonizer {
+ volatile CountDownLatch waiter;
+ volatile CountDownLatch updateJoin;
+ volatile CountDownLatch leftCheckpoint;
+
+ public Synchonizer(int numUpdater, int numThreads) {
+ reset(numUpdater, numThreads);
+ }
+
+ public void reset(int numUpdaters, int numThreads) {
+ this.waiter = new CountDownLatch(1);
+ this.updateJoin = new CountDownLatch(numUpdaters);
+ this.leftCheckpoint = new CountDownLatch(numUpdaters);
+ }
+
+ public boolean await() throws InterruptedException {
+ return waiter.await(10, TimeUnit.SECONDS);
+ }
+
+ }
}