You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/21 16:02:19 UTC

svn commit: r1352535 - in /lucene/dev/trunk/lucene/core/src: java/org/apache/lucene/index/ test/org/apache/lucene/index/

Author: simonw
Date: Thu Jun 21 14:02:18 2012
New Revision: 1352535

URL: http://svn.apache.org/viewvc?rev=1352535&view=rev
Log:
LUCENE-4158: Simplify DocumentsWriterStallControl to prevent further deadlocks

Modified:
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Thu Jun 21 14:02:18 2012
@@ -26,7 +26,6 @@ import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
-import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
@@ -41,7 +40,7 @@ import org.apache.lucene.util.ThreadInte
  * {@link IndexWriterConfig#getRAMPerThreadHardLimitMB()} to prevent address
  * space exhaustion.
  */
-final class DocumentsWriterFlushControl implements MemoryController {
+final class DocumentsWriterFlushControl  {
 
   private final long hardMaxBytesPerDWPT;
   private long activeBytes = 0;
@@ -88,7 +87,7 @@ final class DocumentsWriterFlushControl 
     return flushBytes + activeBytes;
   }
   
-  public long stallLimitBytes() {
+  private long stallLimitBytes() {
     final double maxRamMB = config.getRAMBufferSizeMB();
     return maxRamMB != IndexWriterConfig.DISABLE_AUTO_FLUSH ? (long)(2 * (maxRamMB * 1024 * 1024)) : Long.MAX_VALUE;
   }
@@ -178,7 +177,7 @@ final class DocumentsWriterFlushControl 
       }
       return flushingDWPT;
     } finally {
-      stallControl.updateStalled(this);
+      updateStallState();
       assert assertMemory();
     }
   }
@@ -192,13 +191,30 @@ final class DocumentsWriterFlushControl 
       assert assertMemory();
     } finally {
       try {
-        stallControl.updateStalled(this);
+       updateStallState();
       } finally {
         notifyAll();
       }
     }
   }
   
+  private final void updateStallState() {
+    
+    assert Thread.holdsLock(this);
+    final long limit = stallLimitBytes();
+    /*
+     * we block indexing threads if net byte grows due to slow flushes
+     * yet, for small ram buffers and large documents we can easily
+     * reach the limit without any ongoing flushes. we need to ensure
+     * that we don't stall/block if an ongoing or pending flush can
+     * not free up enough memory to release the stall lock.
+     */
+    final boolean stall = ((activeBytes + flushBytes) > limit)  &&
+                          (activeBytes < limit) &&
+                          !closed;
+    stallControl.updateStalled(stall);
+  }
+  
   public synchronized void waitForFlush() {
     while (flushingWriters.size() != 0) {
       try {
@@ -238,7 +254,7 @@ final class DocumentsWriterFlushControl 
       // Take it out of the loop this DWPT is stale
       perThreadPool.replaceForFlush(state, closed);
     } finally {
-      stallControl.updateStalled(this);
+      updateStallState();
     }
   }
 
@@ -288,7 +304,7 @@ final class DocumentsWriterFlushControl 
       }
       return null;
     } finally {
-      stallControl.updateStalled(this);
+      updateStallState();
     }
   }
 
@@ -304,7 +320,7 @@ final class DocumentsWriterFlushControl 
     synchronized (this) {
       final DocumentsWriterPerThread poll;
       if ((poll = flushQueue.poll()) != null) {
-        stallControl.updateStalled(this);
+        updateStallState();
         return poll;
       }
       fullFlush = this.fullFlush;
@@ -458,7 +474,7 @@ final class DocumentsWriterFlushControl 
       assert assertBlockedFlushes(documentsWriter.deleteQueue);
       flushQueue.addAll(fullFlushBuffer);
       fullFlushBuffer.clear();
-      stallControl.updateStalled(this);
+      updateStallState();
     }
     assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
   }
@@ -537,7 +553,7 @@ final class DocumentsWriterFlushControl 
       }
     } finally {
       fullFlush = false;
-      stallControl.updateStalled(this);
+      updateStallState();
     }
   }
   
@@ -572,7 +588,7 @@ final class DocumentsWriterFlushControl 
       fullFlush = false;
       flushQueue.clear();
       blockedFlushes.clear();
-      stallControl.updateStalled(this);
+      updateStallState();
     }
   }
   

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java Thu Jun 21 14:02:18 2012
@@ -16,7 +16,8 @@ package org.apache.lucene.index;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.IdentityHashMap;
+import java.util.Map;
 
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.util.ThreadInterruptedException;
@@ -37,107 +38,81 @@ import org.apache.lucene.util.ThreadInte
  * continue indexing.
  */
 final class DocumentsWriterStallControl {
-  @SuppressWarnings("serial")
-  private static final class Sync extends AbstractQueuedSynchronizer {
-
-    Sync() {
-      setState(0);
-    }
-
-    boolean isHealthy() {
-      return getState() == 0;
-    }
-
-    boolean trySetStalled() {
-      int state = getState();
-      return compareAndSetState(state, state + 1);
-    }
-
-    boolean tryReset() {
-      final int oldState = getState();
-      if (oldState == 0) {
-        return true;
-      }
-      if (compareAndSetState(oldState, 0)) {
-        return releaseShared(0);
-      }
-      return false;
-    }
-
-    @Override
-    public int tryAcquireShared(int acquires) {
-      return getState() == 0 ? 1 : -1;
-    }
-
-   
-
-    @Override
-    public boolean tryReleaseShared(int newState) {
-      return (getState() == 0);
-    }
-  }
-
-  private final Sync sync = new Sync();
-  volatile boolean wasStalled = false; // only with asserts
-
-  boolean anyStalledThreads() {
-    return !sync.isHealthy();
-  }
-
+  
+  private volatile boolean stalled;
+  private int numWaiting; // only with assert
+  private boolean wasStalled; // only with assert
+  private final Map<Thread, Boolean> waiting = new IdentityHashMap<Thread, Boolean>(); // only with assert
+  
   /**
    * Update the stalled flag status. This method will set the stalled flag to
    * <code>true</code> iff the number of flushing
    * {@link DocumentsWriterPerThread} is greater than the number of active
    * {@link DocumentsWriterPerThread}. Otherwise it will reset the
-   * {@link DocumentsWriterStallControl} to healthy and release all threads waiting on
-   * {@link #waitIfStalled()}
+   * {@link DocumentsWriterStallControl} to healthy and release all threads
+   * waiting on {@link #waitIfStalled()}
    */
-  void updateStalled(MemoryController controller) {
-    do {
-      final long netBytes = controller.netBytes();
-      final long flushBytes = controller.flushBytes();
-      final long limit = controller.stallLimitBytes();
-      assert netBytes >= flushBytes;
-      assert limit > 0;
-      /*
-       * we block indexing threads if net byte grows due to slow flushes
-       * yet, for small ram buffers and large documents we can easily
-       * reach the limit without any ongoing flushes. we need to ensure
-       * that we don't stall/block if an ongoing or pending flush can 
-       * not free up enough memory to release the stall lock.
-       */
-      while (netBytes > limit && (netBytes - flushBytes) < limit) {
-        if (sync.trySetStalled()) {
-          assert wasStalled = true;
-          return;
-        }
-      }
-    } while (!sync.tryReset());
+  synchronized void updateStalled(boolean stalled) {
+    this.stalled = stalled;
+    if (stalled) {
+      wasStalled = true;
+    }
+    notifyAll();
   }
-
+  
+  /**
+   * Blocks if documents writing is currently in a stalled state. 
+   * 
+   */
   void waitIfStalled() {
-    try {
-      sync.acquireSharedInterruptibly(0);
-    } catch (InterruptedException e) {
-      throw new ThreadInterruptedException(e);
+    if (stalled) {
+      synchronized (this) {
+        boolean hasWaited = false;
+        while (stalled) {
+          try {
+            assert hasWaited || incWaiters();
+            assert (hasWaited = true);
+            wait();
+          } catch (InterruptedException e) {
+            throw new ThreadInterruptedException(e);
+          }
+        }
+        assert !hasWaited || decrWaiters();
+      }
     }
   }
   
-  boolean hasBlocked() { // for tests
-    return sync.hasQueuedThreads();
+  boolean anyStalledThreads() {
+    return stalled;
   }
   
-  static interface MemoryController {
-    long netBytes();
-    long flushBytes();
-    long stallLimitBytes();
+  
+  private boolean incWaiters() {
+    numWaiting++;
+    assert waiting.put(Thread.currentThread(), Boolean.TRUE) == null;
+    
+    return numWaiting > 0;
   }
-
-  public boolean isHealthy() {
-    return sync.isHealthy();
+  
+  private boolean decrWaiters() {
+    numWaiting--;
+    assert waiting.remove(Thread.currentThread()) != null;
+    return numWaiting >= 0;
+  }
+  
+  synchronized boolean hasBlocked() { // for tests
+    return numWaiting > 0;
+  }
+  
+  boolean isHealthy() { // for tests
+    return !stalled; // volatile read!
   }
   
-  public boolean isThreadQueued(Thread t) {
-    return sync.isQueued(t);
+  synchronized boolean isThreadQueued(Thread t) { // for tests
+    return waiting.containsKey(t);
+  }
+
+  synchronized boolean wasStalled() { // for tests
+    return wasStalled;
   }
 }

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java Thu Jun 21 14:02:18 2012
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.lucene.index.DocumentsWriterStallControl.MemoryController;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.ThreadInterruptedException;
 
@@ -38,11 +37,8 @@ public class TestDocumentsWriterStallCon
   
   public void testSimpleStall() throws InterruptedException {
     DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
-    SimpleMemCtrl memCtrl = new SimpleMemCtrl();
-    memCtrl.limit = 1000;
-    memCtrl.netBytes = 1000;
-    memCtrl.flushBytes = 20;
-    ctrl.updateStalled(memCtrl);
+   
+    ctrl.updateStalled(false);
     Thread[] waitThreads = waitThreads(atLeast(1), ctrl);
     start(waitThreads);
     assertFalse(ctrl.hasBlocked());
@@ -50,43 +46,31 @@ public class TestDocumentsWriterStallCon
     join(waitThreads, 10);
     
     // now stall threads and wake them up again
-    memCtrl.netBytes = 1001;
-    memCtrl.flushBytes = 100;
-    ctrl.updateStalled(memCtrl);
+    ctrl.updateStalled(true);
     waitThreads = waitThreads(atLeast(1), ctrl);
     start(waitThreads);
     awaitState(100, Thread.State.WAITING, waitThreads);
     assertTrue(ctrl.hasBlocked());
     assertTrue(ctrl.anyStalledThreads());
-    memCtrl.netBytes = 50;
-    memCtrl.flushBytes = 0;
-    ctrl.updateStalled(memCtrl);
+    ctrl.updateStalled(false);
     assertFalse(ctrl.anyStalledThreads());
     join(waitThreads, 500);
   }
   
   public void testRandom() throws InterruptedException {
     final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
-    SimpleMemCtrl memCtrl = new SimpleMemCtrl();
-    memCtrl.limit = 1000;
-    memCtrl.netBytes = 1;
-    ctrl.updateStalled(memCtrl);
+    ctrl.updateStalled(false);
+    
     Thread[] stallThreads = new Thread[atLeast(3)];
     for (int i = 0; i < stallThreads.length; i++) {
       final int threadId = i;
+      final int stallProbability = 1 +random().nextInt(10);
       stallThreads[i] = new Thread() {
         public void run() {
-          int baseBytes = threadId % 2 == 0 ? 500 : 700;
-          SimpleMemCtrl memCtrl = new SimpleMemCtrl();
-          memCtrl.limit = 1000;
-          memCtrl.netBytes = 1;
-          memCtrl.flushBytes = 0;
 
           int iters = atLeast(1000);
           for (int j = 0; j < iters; j++) {
-            memCtrl.netBytes = baseBytes + random().nextInt(1000);
-            memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
-            ctrl.updateStalled(memCtrl);
+            ctrl.updateStalled(random().nextInt(stallProbability) == 0);
             if (random().nextInt(5) == 0) { // thread 0 only updates
               ctrl.waitIfStalled();
             }
@@ -102,7 +86,7 @@ public class TestDocumentsWriterStallCon
      */
     while ((System.currentTimeMillis() - time) < 100 * 1000
         && !terminated(stallThreads)) {
-      ctrl.updateStalled(memCtrl);
+      ctrl.updateStalled(false);
       if (random().nextBoolean()) {
         Thread.yield();
       } else {
@@ -116,11 +100,7 @@ public class TestDocumentsWriterStallCon
   
   public void testAccquireReleaseRace() throws InterruptedException {
     final DocumentsWriterStallControl ctrl = new DocumentsWriterStallControl();
-    SimpleMemCtrl memCtrl = new SimpleMemCtrl();
-    memCtrl.limit = 1000;
-    memCtrl.netBytes = 1;
-    memCtrl.flushBytes = 0;
-    ctrl.updateStalled(memCtrl);
+    ctrl.updateStalled(false);
     final AtomicBoolean stop = new AtomicBoolean(false);
     final AtomicBoolean checkPoint = new AtomicBoolean(true);
     
@@ -191,10 +171,7 @@ public class TestDocumentsWriterStallCon
     
     
     for (int i = 0; i < threads.length; i++) {
-      memCtrl.limit = 1000;
-      memCtrl.netBytes = 1;
-      memCtrl.flushBytes = 0;
-      ctrl.updateStalled(memCtrl);
+      ctrl.updateStalled(false);
       threads[i].join(2000);
       if (threads[i].isAlive() && threads[i] instanceof Waiter) {
         if (threads[i].getState() == Thread.State.WAITING) {
@@ -290,14 +267,11 @@ public class TestDocumentsWriterStallCon
     
     public void run() {
       try {
-        SimpleMemCtrl memCtrl = new SimpleMemCtrl();
-        memCtrl.limit = 1000;
-        memCtrl.netBytes = release ? 1 : 2000;
-        memCtrl.flushBytes = random().nextInt((int)memCtrl.netBytes);
+       
         while (!stop.get()) {
           int internalIters = release && random().nextBoolean() ? atLeast(5) : 1;
           for (int i = 0; i < internalIters; i++) {
-            ctrl.updateStalled(memCtrl);
+            ctrl.updateStalled(random().nextBoolean());
           }
           if (checkPoint.get()) {
             sync.updateJoin.countDown();
@@ -379,28 +353,6 @@ public class TestDocumentsWriterStallCon
         + " ms");
   }
   
-  private static class SimpleMemCtrl implements MemoryController {
-    long netBytes;
-    long limit;
-    long flushBytes;
-    
-    @Override
-    public long netBytes() {
-      return netBytes;
-    }
-    
-    @Override
-    public long stallLimitBytes() {
-      return limit;
-    }
-
-    @Override
-    public long flushBytes() {
-      return flushBytes;
-    }
-    
-  }
-  
   private static final class Synchronizer {
     volatile CountDownLatch waiter;
     volatile CountDownLatch updateJoin;

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java?rev=1352535&r1=1352534&r2=1352535&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java Thu Jun 21 14:02:18 2012
@@ -109,7 +109,7 @@ public class TestFlushByRamOrCountsPolic
       assertTrue(maxRAMBytes < flushControl.peakActiveBytes);
     }
     if (ensureNotStalled) {
-      assertFalse(docsWriter.flushControl.stallControl.wasStalled);
+      assertFalse(docsWriter.flushControl.stallControl.wasStalled());
     }
     writer.close();
     assertEquals(0, flushControl.activeBytes());
@@ -222,7 +222,7 @@ public class TestFlushByRamOrCountsPolic
     assertEquals(numDocumentsToIndex, r.numDocs());
     assertEquals(numDocumentsToIndex, r.maxDoc());
     if (!flushPolicy.flushOnRAM()) {
-      assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled);
+      assertFalse("never stall if we don't flush on RAM", docsWriter.flushControl.stallControl.wasStalled());
       assertFalse("never block if we don't flush on RAM", docsWriter.flushControl.stallControl.hasBlocked());
     }
     r.close();
@@ -275,7 +275,7 @@ public class TestFlushByRamOrCountsPolic
             docsWriter.flushControl.stallControl.hasBlocked());
       }
       if (docsWriter.flushControl.peakNetBytes > (2.d * iwc.getRAMBufferSizeMB() * 1024.d * 1024.d)) {
-        assertTrue(docsWriter.flushControl.stallControl.wasStalled);
+        assertTrue(docsWriter.flushControl.stallControl.wasStalled());
       }
       assertActiveBytesAfter(flushControl);
       writer.close(true);