You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2013/05/21 12:41:48 UTC

svn commit: r1484742 - in /lucene/dev/branches/branch_4x: ./ lucene/ lucene/core/ lucene/core/src/java/org/apache/lucene/index/ lucene/core/src/test/org/apache/lucene/index/

Author: simonw
Date: Tue May 21 10:41:47 2013
New Revision: 1484742

URL: http://svn.apache.org/r1484742
Log:
LUCENE-5002: Fix Deadlock when IW#deleteAll() is called concurrently to a flushing DWPT

Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/lucene/   (props changed)
    lucene/dev/branches/branch_4x/lucene/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/lucene/core/   (props changed)
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
    lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java

Modified: lucene/dev/branches/branch_4x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/CHANGES.txt?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/lucene/CHANGES.txt Tue May 21 10:41:47 2013
@@ -174,6 +174,11 @@ Bug Fixes
   default AND synonyms wrongly became mandatory clauses, and with OR, the
   coordination factor was wrong.  (李威, Robert Muir)
 
+* LUCENE-5002: IndexWriter#deleteAll() caused a deadlock in DWPT / DWSC if a
+  DwPT was flushing concurrently while deleteAll() aborted all DWPT. The IW
+  should never wait on DWPT via the flush control while holding on to the IW
+  Lock. (Simon Willnauer)
+
 Optimizations
 
 * LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 21 10:41:47 2013
@@ -29,6 +29,7 @@ import org.apache.lucene.index.Documents
 import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
 import org.apache.lucene.index.FieldInfos.FieldNumbers;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.similarities.Similarity;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -240,6 +241,63 @@ final class DocumentsWriter {
       }
     }
   }
+  
+  synchronized void lockAndAbortAll() {
+    assert indexWriter.holdsFullFlushLock();
+    if (infoStream.isEnabled("DW")) {
+      infoStream.message("DW", "lockAndAbortAll");
+    }
+    boolean success = false;
+    try {
+      deleteQueue.clear();
+      final int limit = perThreadPool.getMaxThreadStates();
+      for (int i = 0; i < limit; i++) {
+        final ThreadState perThread = perThreadPool.getThreadState(i);
+        perThread.lock();
+        if (perThread.isActive()) { // we might be closed or 
+          try {
+            perThread.dwpt.abort();
+          } finally {
+            perThread.dwpt.checkAndResetHasAborted();
+            flushControl.doOnAbort(perThread);
+          }
+        }
+      }
+      deleteQueue.clear();
+      flushControl.abortPendingFlushes();
+      flushControl.waitForFlush();
+      success = true;
+    } finally {
+      if (infoStream.isEnabled("DW")) {
+        infoStream.message("DW", "finished lockAndAbortAll success=" + success);
+      }
+      if (!success) {
+        // if something happens here we unlock all states again
+        unlockAllAfterAbortAll();
+      }
+    }
+  }
+  
+  final synchronized void unlockAllAfterAbortAll() {
+    assert indexWriter.holdsFullFlushLock();
+    if (infoStream.isEnabled("DW")) {
+      infoStream.message("DW", "unlockAll");
+    }
+    final int limit = perThreadPool.getMaxThreadStates();
+    for (int i = 0; i < limit; i++) {
+      try {
+        final ThreadState perThread = perThreadPool.getThreadState(i);
+        if (perThread.isHeldByCurrentThread()) {
+          perThread.unlock();
+        }
+      } catch(Throwable e) {
+        if (infoStream.isEnabled("DW")) {
+          infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage());
+        }
+        // ignore & keep on unlocking
+      }
+    }
+  }
 
   boolean anyChanges() {
     if (infoStream.isEnabled("DW")) {

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 21 10:41:47 2013
@@ -240,6 +240,7 @@ final class DocumentsWriterFlushControl 
   }
   
   public synchronized void waitForFlush() {
+    assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
     while (flushingWriters.size() != 0) {
       try {
         this.wait();
@@ -606,9 +607,10 @@ final class DocumentsWriterFlushControl 
       for (DocumentsWriterPerThread dwpt : flushQueue) {
         try {
           dwpt.abort();
-          doAfterFlush(dwpt);
         } catch (Throwable ex) {
           // ignore - keep on aborting the flush queue
+        } finally {
+          doAfterFlush(dwpt);
         }
       }
       for (BlockedFlush blockedFlush : blockedFlushes) {
@@ -616,9 +618,10 @@ final class DocumentsWriterFlushControl 
           flushingWriters
               .put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
           blockedFlush.dwpt.abort();
-          doAfterFlush(blockedFlush.dwpt);
         } catch (Throwable ex) {
           // ignore - keep on aborting the blocked queue
+        } finally {
+          doAfterFlush(blockedFlush.dwpt);
         }
       }
     } finally {

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 21 10:41:47 2013
@@ -274,7 +274,6 @@ abstract class DocumentsWriterPerThreadP
    *         given ord.
    */
   ThreadState getThreadState(int ord) {
-    assert ord < numThreadStatesActive;
     return threadStates[ord];
   }
 

Modified: lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Tue May 21 10:41:47 2013
@@ -959,7 +959,7 @@ public class IndexWriter implements Clos
         if (doFlush) {
           flush(waitForMerges, true);
         } else {
-          docWriter.abort(); // already closed
+          docWriter.abort(); // already closed -- never sync on IW 
         }
         
       } finally {
@@ -2008,7 +2008,7 @@ public class IndexWriter implements Clos
 
       bufferedDeletesStream.clear();
       docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes 
-      docWriter.abort();
+      docWriter.abort(); // don't sync on IW here
       synchronized(this) {
 
         if (pendingCommit != null) {
@@ -2068,7 +2068,13 @@ public class IndexWriter implements Clos
    *    visible until a {@link #commit()} has been called. This method
    *    can be rolled back using {@link #rollback()}.</p>
    *
-   * <p>NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ).</p>
+   * <p>NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ). 
+   *    Yet, this method also has different semantics compared to {@link #deleteDocuments(Query)} 
+   *    / {@link #deleteDocuments(Query...)} since internal data-structures are cleared as well 
+   *    as all segment information is forcefully dropped anti-viral semantics like omitting norms
+   *    are reset or doc value types are cleared. Essentially a call to {@link #deleteAll()} is equivalent
+   *    to creating a new {@link IndexWriter} with {@link OpenMode#CREATE} which a delete query only marks
+   *    documents as deleted.</p>
    *
    * <p>NOTE: this method will forcefully abort all merges
    *    in progress.  If other threads are running {@link
@@ -2076,40 +2082,58 @@ public class IndexWriter implements Clos
    *    {@link #forceMergeDeletes} methods, they may receive
    *    {@link MergePolicy.MergeAbortedException}s.
    */
-  public synchronized void deleteAll() throws IOException {
+  public void deleteAll() throws IOException {
     ensureOpen();
+    // Remove any buffered docs
     boolean success = false;
-    try {
-
-      // Abort any running merges
-      finishMerges(false);
-
-      // Remove any buffered docs
-      docWriter.abort();
-
-      // Remove all segments
-      segmentInfos.clear();
-
-      // Ask deleter to locate unreferenced files & remove them:
-      deleter.checkpoint(segmentInfos, false);
-      deleter.refresh();
-
-      globalFieldNumberMap.clear();
-
-      // Don't bother saving any changes in our segmentInfos
-      readerPool.dropAll(false);
-
-      // Mark that the index has changed
-      ++changeCount;
-      segmentInfos.changed();
-      success = true;
-    } catch (OutOfMemoryError oom) {
-      handleOOM(oom, "deleteAll");
-    } finally {
-      if (!success) {
-        if (infoStream.isEnabled("IW")) {
-          infoStream.message("IW", "hit exception during deleteAll");
+    /* hold the full flush lock to prevent concurrency commits / NRT reopens to
+     * get in our way and do unnecessary work. -- if we don't lock this here we might
+     * get in trouble if */
+    synchronized (fullFlushLock) { 
+        /*
+         * We first abort and trash everything we have in-memory
+         * and keep the thread-states locked, the lockAndAbortAll operation
+         * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
+         * of logical happens-before relationship in the DW. So we do
+         * abort all in memory structures 
+         * We also drop global field numbering before during abort to make
+         * sure it's just like a fresh index.
+         */
+      try {
+        docWriter.lockAndAbortAll();
+        synchronized (this) {
+          try {
+            // Abort any running merges
+            finishMerges(false);
+            // Remove all segments
+            segmentInfos.clear();
+            // Ask deleter to locate unreferenced files & remove them:
+            deleter.checkpoint(segmentInfos, false);
+            /* don't refresh the deleter here since there might
+             * be concurrent indexing requests coming in opening
+             * files on the directory after we called DW#abort()
+             * if we do so these indexing requests might hit FNF exceptions.
+             * We will remove the files incrementally as we go...
+             */
+            // Don't bother saving any changes in our segmentInfos
+            readerPool.dropAll(false);
+            // Mark that the index has changed
+            ++changeCount;
+            segmentInfos.changed();
+            globalFieldNumberMap.clear();
+            success = true;
+          } catch (OutOfMemoryError oom) {
+            handleOOM(oom, "deleteAll");
+          } finally {
+            if (!success) {
+              if (infoStream.isEnabled("IW")) {
+                infoStream.message("IW", "hit exception during deleteAll");
+              }
+            }
+          }
         }
+      } finally {
+        docWriter.unlockAllAfterAbortAll();
       }
     }
   }
@@ -2912,6 +2936,11 @@ public class IndexWriter implements Clos
   // Ensures only one flush() is actually flushing segments
   // at a time:
   private final Object fullFlushLock = new Object();
+  
+  // for assert
+  boolean holdsFullFlushLock() {
+    return Thread.holdsLock(fullFlushLock);
+  }
 
   /**
    * Flush all in-memory buffered updates (adds and deletes)

Modified: lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1484742&r1=1484741&r2=1484742&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Tue May 21 10:41:47 2013
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -302,6 +304,71 @@ public class TestIndexWriterDelete exten
     modifier.close();
     dir.close();
   }
+  
+  
+  public void testDeleteAllNoDeadLock() throws IOException, InterruptedException {
+    Directory dir = newDirectory();
+    final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir); 
+    int numThreads = atLeast(2);
+    Thread[] threads = new Thread[numThreads];
+    final CountDownLatch latch = new CountDownLatch(1);
+    final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      final int offset = i;
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          int id = offset * 1000;
+          int value = 100;
+          try {
+            latch.await();
+            for (int i = 0; i < 1000; i++) {
+              Document doc = new Document();
+              doc.add(newTextField("content", "aaa", Field.Store.NO));
+              doc.add(newStringField("id", String.valueOf(id++), Field.Store.YES));
+              doc.add(newStringField("value", String.valueOf(value), Field.Store.NO));
+              if (defaultCodecSupportsDocValues()) {
+                doc.add(new NumericDocValuesField("dv", value));
+              }
+              modifier.addDocument(doc);
+              if (VERBOSE) {
+                System.out.println("\tThread["+offset+"]: add doc: " + id);
+              }
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          } finally {
+            doneLatch.countDown();
+            if (VERBOSE) {
+              System.out.println("\tThread["+offset+"]: done indexing" );
+            }
+          }
+        }
+      };
+      threads[i].start();
+    }
+    latch.countDown();
+    while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
+      modifier.deleteAll();
+      if (VERBOSE) {
+        System.out.println("del all");
+      }
+    }
+    
+    modifier.deleteAll();
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    
+    modifier.close();
+    DirectoryReader reader = DirectoryReader.open(dir);
+    assertEquals(reader.maxDoc(), 0);
+    assertEquals(reader.numDocs(), 0);
+    assertEquals(reader.numDeletedDocs(), 0);
+    reader.close();
+
+    dir.close();
+  }
 
   // test rollback of deleteAll()
   public void testDeleteAllRollback() throws IOException {