You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/21 16:02:19 UTC
svn commit: r1352535 - in /lucene/dev/trunk/lucene/core/src:
java/org/apache/lucene/index/ test/org/apache/lucene/index/
Author: simonw
Date: Thu Jun 21 14:02:18 2012
New Revision: 1352535
URL: http://svn.apache.org/viewvc?rev=1352535&view=rev
Log:
LUCENE-4158: Simplify DocumentsWriterStallControl to prevent further deadlocks
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Thu Jun 21 14:02:18 2012
@@ -26,7 +26,6 @@ import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
import org.apache.lucene.util.ThreadInterruptedException;
/**
@@ -41,7 +40,7 @@ import org.apache.lucene.util.ThreadInte
* {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
* space exhaustion.
*/
-final class DocumentsWriterFlushControl implements MemoryController {
+final class DocumentsWriterFlushControl {
private final long hardMaxBytesPerDWPT;
private long activeBytes = 0;
@@ -88,7 +87,7 @@ final class DocumentsWriterFlushControl
return flushBytes + activeBytes;
}
- public long stallLimitBytes() {
+ private long stallLimitBytes() {
final double maxRamMB = config.getRAMBufferSizeMB();
return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
}
@@ -178,7 +177,7 @@ final class DocumentsWriterFlushControl
}
return flushingDWPT;
} finally {
- stallControl.updateStalled(this);
+ updateStallState();
assert assertMemory();
}
}
@@ -192,13 +191,30 @@ final class DocumentsWriterFlushControl
assert assertMemory();
} finally {
try {
- stallControl.updateStalled(this);
+ updateStallState();
} finally {
notifyAll();
}
}
}
+ private final void updateStallState() {
+
+ assert Thread.holdsLock(this);
+ final long limit = stallLimitBytes();
+ /*
+ * we block indexing threads if net byte grows due to slow flushes
+ * yet, for small ram buffers and large documents we can easily
+ * reach the limit without any ongoing flushes. we need to ensure
+ * that we don't stall/block if an ongoing or pending flush can
+ * not free up enough memory to release the stall lock.
+ */
+ final boolean stall = ((activeBytes + flushBytes) > limit) &&
+ (activeBytes < limit) &&
+ !closed;
+ stallControl.updateStalled(stall);
+ }
+
public synchronized void waitForFlush() {
while (flushingWriters.size() != 0) {
try {
@@ -238,7 +254,7 @@ final class DocumentsWriterFlushControl
// Take it out of the loop this DWPT is stale
perThreadPool.replaceForFlush(state, closed);
} finally {
- stallControl.updateStalled(this);
+ updateStallState();
}
}
@@ -288,7 +304,7 @@ final class DocumentsWriterFlushControl
}
return null;
} finally {
- stallControl.updateStalled(this);
+ updateStallState();
}
}
@@ -304,7 +320,7 @@ final class DocumentsWriterFlushControl
synchronized (this) {
final DocumentsWriterPerThread poll;
if ((poll = flushQueue.poll()) != null) {
- stallControl.updateStalled(this);
+ updateStallState();
return poll;
}
fullFlush = this.fullFlush;
@@ -458,7 +474,7 @@ final class DocumentsWriterFlushControl
assert assertBlockedFlushes(documentsWriter.deleteQueue);
flushQueue.addAll(fullFlushBuffer);
fullFlushBuffer.clear();
- stallControl.updateStalled(this);
+ updateStallState();
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
}
@@ -537,7 +553,7 @@ final class DocumentsWriterFlushControl
}
} finally {
fullFlush = false;
- stallControl.updateStalled(this);
+ updateStallState();
}
}
@@ -572,7 +588,7 @@ final class DocumentsWriterFlushControl
fullFlush = false;
flushQueue.clear();
blockedFlushes.clear();
- stallControl.updateStalled(this);
+ updateStallState();
}
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Thu Jun 21 14:02:18 2012
@@ -16,7 +16,8 @@ package org.apache.lucene.index;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.IdentityHashMap;
+import java.util.Map;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -37,107 +38,81 @@ import org.apache.lucene.util.ThreadInte
* continue indexing.
*/
final class DocumentsWriterStallControl {
- @SuppressWarnings("serial")
- private static final class Sync extends AbstractQueuedSynchronizer {
-
- Sync() {
- setState(0);
- }
-
- boolean isHealthy() {
- return getState() == 0;
- }
-
- boolean trySetStalled() {
- int state = getState();
- return compareAndSetState(state, state + 1);
- }
-
- boolean tryReset() {
- final int oldState = getState();
- if (oldState == 0) {
- return true;
- }
- if (compareAndSetState(oldState, 0)) {
- return releaseShared(0);
- }
- return false;
- }
-
- @Override
- public int tryAcquireShared(int acquires) {
- return getState() == 0 ? 1 : -1;
- }
-
-
-
- @Override
- public boolean tryReleaseShared(int newState) {
- return (getState() == 0);
- }
- }
-
- private final Sync sync = new Sync();
- volatile boolean wasStalled = false; // only with asserts
-
- boolean anyStalledThreads() {
- return !sync.isHealthy();
- }
-
+
+ private volatile boolean stalled;
+ private int numWaiting; // only with assert
+ private boolean wasStalled; // only with assert
+ private final Map<Thread, Boolean> waiting = new IdentityHashMap<Thread, Boolean>(); // only with assert
+
/**
* Update the stalled flag status. This method will set the stalled flag to
* <code>true</code> iff the number of flushing
* {@link DocumentsWriterPerThread} is greater than the number of active
* {@link DocumentsWriterPerThread}. Otherwise it will reset the
- * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on
- * {@link #waitIfStalled()}
+ * {@link DocumentsWriterStallControl} to healthy and release all threads
+ * waiting on {@link #waitIfStalled()}
*/
- void updateStalled(MemoryController controller) {
- do {
- final long netBytes = controller.netBytes();
- final long flushBytes = controller.flushBytes();
- final long limit = controller.stallLimitBytes();
- assert netBytes >= flushBytes;
- assert limit > 0;
- /*
- * we block indexing threads if net byte grows due to slow flushes
- * yet, for small ram buffers and large documents we can easily
- * reach the limit without any ongoing flushes. we need to ensure
- * that we don't stall/block if an ongoing or pending flush can
- * not free up enough memory to release the stall lock.
- */
- while (netBytes > limit && (netBytes - flushBytes) < limit) {
- if (sync.trySetStalled()) {
- assert wasStalled = true;
- return;
- }
- }
- } while (!sync.tryReset());
+ synchronized void updateStalled(boolean stalled) {
+ this.stalled = stalled;
+ if (stalled) {
+ wasStalled = true;
+ }
+ notifyAll();
}
-
+
+ /**
+ * Blocks if documents writing is currently in a stalled state.
+ *
+ */
void waitIfStalled() {
- try {
- sync.acquireSharedInterruptibly(0);
- } catch (InterruptedException e) {
- throw new ThreadInterruptedException(e);
+ if (stalled) {
+ synchronized (this) {
+ boolean hasWaited = false;
+ while (stalled) {
+ try {
+ assert hasWaited || incWaiters();
+ assert (hasWaited = true);
+ wait();
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ }
+ }
+ assert !hasWaited || decrWaiters();
+ }
}
}
- boolean hasBlocked() { // for tests
- return sync.hasQueuedThreads();
+ boolean anyStalledThreads() {
+ return stalled;
}
- static interface MemoryController {
- long netBytes();
- long flushBytes();
- long stallLimitBytes();
+
+ private boolean incWaiters() {
+ numWaiting++;
+ assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
+
+ return numWaiting > 0;
}
-
- public boolean isHealthy() {
- return sync.isHealthy();
+
+ private boolean decrWaiters() {
+ numWaiting--;
+ assert waiting.remove(Thread.currentThread()) != null;
+ return numWaiting >= 0;
+ }
+
+ synchronized boolean hasBlocked() { // for tests
+ return numWaiting > 0;
+ }
+
+ boolean isHealthy() { // for tests
+ return !stalled; // volatile read!
}
- public boolean isThreadQueued(Thread t) {
- return sync.isQueued(t);
+ synchronized boolean isThreadQueued(Thread t) { // for tests
+ return waiting.containsKey(t);
+ }
+
+ synchronized boolean wasStalled() { // for tests
+ return wasStalled;
}
}
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java Thu Jun 21 14:02:18 2012
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
@@ -38,11 +37,8 @@ public class TestDocumentsWriterStallCon
public void testSimpleStall() throws InterruptedException {
DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
- SimpleMemCtrl memCtrl = new SimpleMemCtrl();
- memCtrl.limit = 1000;
- memCtrl.netBytes = 1000;
- memCtrl.flushBytes = 20;
- ctrl.updateStalled(memCtrl);
+
+ ctrl.updateStalled(false);
Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads);
assertFalse(ctrl.hasBlocked());
@@ -50,43 +46,31 @@ public class TestDocumentsWriterStallCon
join(waitThreads, 10);
// now stall threads and wake them up again
- memCtrl.netBytes = 1001;
- memCtrl.flushBytes = 100;
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(true);
waitThreads = waitThreads(atLeast(1), ctrl);
start(waitThreads);
awaitState(100, Thread.State.WAITING, waitThreads);
assertTrue(ctrl.hasBlocked());
assertTrue(ctrl.anyStalledThreads());
- memCtrl.netBytes = 50;
- memCtrl.flushBytes = 0;
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(false);
assertFalse(ctrl.anyStalledThreads());
join(waitThreads, 500);
}
public void testRandom() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
- SimpleMemCtrl memCtrl = new SimpleMemCtrl();
- memCtrl.limit = 1000;
- memCtrl.netBytes = 1;
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(false);
+
Thread[] stallThreads = new Thread[atLeast(3)];
for (int i = 0; i < stallThreads.length; i++) {
final int threadId = i;
+ final int stallProbability = 1 +random().nextInt(10);
stallThreads[i] = new Thread() {
public void run() {
- int baseBytes = threadId % 2 == 0 ? 500 : 700;
- SimpleMemCtrl memCtrl = new SimpleMemCtrl();
- memCtrl.limit = 1000;
- memCtrl.netBytes = 1;
- memCtrl.flushBytes = 0;
int iters = atLeast(1000);
for (int j = 0; j < iters; j++) {
- memCtrl.netBytes = baseBytes + random().nextInt(1000);
- memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(random().nextInt(stallProbability) == 0);
if (random().nextInt(5) == 0) { // thread 0 only updates
ctrl.waitIfStalled();
}
@@ -102,7 +86,7 @@ public class TestDocumentsWriterStallCon
*/
while ((System.currentTimeMillis() - time) < 100 * 1000
&& !terminated(stallThreads)) {
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(false);
if (random().nextBoolean()) {
Thread.yield();
} else {
@@ -116,11 +100,7 @@ public class TestDocumentsWriterStallCon
public void testAccquireReleaseRace() throws InterruptedException {
final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
- SimpleMemCtrl memCtrl = new SimpleMemCtrl();
- memCtrl.limit = 1000;
- memCtrl.netBytes = 1;
- memCtrl.flushBytes = 0;
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(false);
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicBoolean checkPoint = new AtomicBoolean(true);
@@ -191,10 +171,7 @@ public class TestDocumentsWriterStallCon
for (int i = 0; i < threads.length; i++) {
- memCtrl.limit = 1000;
- memCtrl.netBytes = 1;
- memCtrl.flushBytes = 0;
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(false);
threads[i].join(2000);
if (threads[i].isAlive() && threads[i] instanceof Waiter) {
if (threads[i].getState() == Thread.State.WAITING) {
@@ -290,14 +267,11 @@ public class TestDocumentsWriterStallCon
public void run() {
try {
- SimpleMemCtrl memCtrl = new SimpleMemCtrl();
- memCtrl.limit = 1000;
- memCtrl.netBytes = release ? 1 : 2000;
- memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
+
while (!stop.get()) {
int internalIters = release && random().nextBoolean() ? atLeast(5) : 1;
for (int i = 0; i < internalIters; i++) {
- ctrl.updateStalled(memCtrl);
+ ctrl.updateStalled(random().nextBoolean());
}
if (checkPoint.get()) {
sync.updateJoin.countDown();
@@ -379,28 +353,6 @@ public class TestDocumentsWriterStallCon
+ " ms");
}
- private static class SimpleMemCtrl implements MemoryController {
- long netBytes;
- long limit;
- long flushBytes;
-
- @Override
- public long netBytes() {
- return netBytes;
- }
-
- @Override
- public long stallLimitBytes() {
- return limit;
- }
-
- @Override
- public long flushBytes() {
- return flushBytes;
- }
-
- }
-
private static final class Synchronizer {
volatile CountDownLatch waiter;
volatile CountDownLatch updateJoin;
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Thu Jun 21 14:02:18 2012
@@ -109,7 +109,7 @@ public class TestFlushByRamOrCountsPolic
assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
}
if (ensureNotStalled) {
- assertFalse(docsWriter.flushControl.stallControl.wasStalled);
+ assertFalse(docsWriter.flushControl.stallControl.wasStalled());
}
writer.close();
assertEquals(0, flushControl.activeBytes());
@@ -222,7 +222,7 @@ public class TestFlushByRamOrCountsPolic
assertEquals(numDocumentsToIndex, r.numDocs());
assertEquals(numDocumentsToIndex, r.maxDoc());
if (!flushPolicy.flushOnRAM()) {
- assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled);
+ assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled());
assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
}
r.close();
@@ -275,7 +275,7 @@ public class TestFlushByRamOrCountsPolic
docsWriter.flushControl.stallControl.hasBlocked());
}
if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
- assertTrue(docsWriter.flushControl.stallControl.wasStalled);
+ assertTrue(docsWriter.flushControl.stallControl.wasStalled());
}
assertActiveBytesAfter(flushControl);
writer.close(true);