You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2013/05/21 12:28:49 UTC
svn commit: r1484734 - in /lucene/dev/trunk/lucene: ./
core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
Author: simonw
Date: Tue May 21 10:28:48 2013
New Revision: 1484734
URL: http://svn.apache.org/r1484734
Log:
LUCENE-5002: Fix Deadlock when IW#deleteAll() is called concurrently to a flushing DWPT
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Tue May 21 10:28:48 2013
@@ -206,6 +206,11 @@ Bug Fixes
default AND synonyms wrongly became mandatory clauses, and with OR, the
coordination factor was wrong. (æå¨, Robert Muir)
+* LUCENE-5002: IndexWriter#deleteAll() caused a deadlock in DWPT / DWSC if a
+ DwPT was flushing concurrently while deleteAll() aborted all DWPT. The IW
+ should never wait on DWPT via the flush control while holding on to the IW
+ Lock. (Simon Willnauer)
+
Optimizations
* LUCENE-4938: Don't use an unnecessarily large priority queue in IndexSearcher
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 21 10:28:48 2013
@@ -29,6 +29,7 @@ import org.apache.lucene.index.Documents
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
import org.apache.lucene.index.FieldInfos.FieldNumbers;
+import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
@@ -240,6 +241,63 @@ final class DocumentsWriter {
}
}
}
+
+ synchronized void lockAndAbortAll() {
+ assert indexWriter.holdsFullFlushLock();
+ if (infoStream.isEnabled("DW")) {
+ infoStream.message("DW", "lockAndAbortAll");
+ }
+ boolean success = false;
+ try {
+ deleteQueue.clear();
+ final int limit = perThreadPool.getMaxThreadStates();
+ for (int i = 0; i < limit; i++) {
+ final ThreadState perThread = perThreadPool.getThreadState(i);
+ perThread.lock();
+ if (perThread.isActive()) { // we might be closed or
+ try {
+ perThread.dwpt.abort();
+ } finally {
+ perThread.dwpt.checkAndResetHasAborted();
+ flushControl.doOnAbort(perThread);
+ }
+ }
+ }
+ deleteQueue.clear();
+ flushControl.abortPendingFlushes();
+ flushControl.waitForFlush();
+ success = true;
+ } finally {
+ if (infoStream.isEnabled("DW")) {
+ infoStream.message("DW", "finished lockAndAbortAll success=" + success);
+ }
+ if (!success) {
+ // if something happens here we unlock all states again
+ unlockAllAfterAbortAll();
+ }
+ }
+ }
+
+ final synchronized void unlockAllAfterAbortAll() {
+ assert indexWriter.holdsFullFlushLock();
+ if (infoStream.isEnabled("DW")) {
+ infoStream.message("DW", "unlockAll");
+ }
+ final int limit = perThreadPool.getMaxThreadStates();
+ for (int i = 0; i < limit; i++) {
+ try {
+ final ThreadState perThread = perThreadPool.getThreadState(i);
+ if (perThread.isHeldByCurrentThread()) {
+ perThread.unlock();
+ }
+ } catch(Throwable e) {
+ if (infoStream.isEnabled("DW")) {
+ infoStream.message("DW", "unlockAll: could not unlock state: " + i + " msg:" + e.getMessage());
+ }
+ // ignore & keep on unlocking
+ }
+ }
+ }
boolean anyChanges() {
if (infoStream.isEnabled("DW")) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue May 21 10:28:48 2013
@@ -240,6 +240,7 @@ final class DocumentsWriterFlushControl
}
public synchronized void waitForFlush() {
+ assert !Thread.holdsLock(this.documentsWriter.indexWriter) : "IW lock should never be hold when waiting on flush";
while (flushingWriters.size() != 0) {
try {
this.wait();
@@ -606,9 +607,10 @@ final class DocumentsWriterFlushControl
for (DocumentsWriterPerThread dwpt : flushQueue) {
try {
dwpt.abort();
- doAfterFlush(dwpt);
} catch (Throwable ex) {
// ignore - keep on aborting the flush queue
+ } finally {
+ doAfterFlush(dwpt);
}
}
for (BlockedFlush blockedFlush : blockedFlushes) {
@@ -616,9 +618,10 @@ final class DocumentsWriterFlushControl
flushingWriters
.put(blockedFlush.dwpt, Long.valueOf(blockedFlush.bytes));
blockedFlush.dwpt.abort();
- doAfterFlush(blockedFlush.dwpt);
} catch (Throwable ex) {
// ignore - keep on aborting the blocked queue
+ } finally {
+ doAfterFlush(blockedFlush.dwpt);
}
}
} finally {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue May 21 10:28:48 2013
@@ -274,7 +274,6 @@ abstract class DocumentsWriterPerThreadP
* given ord.
*/
ThreadState getThreadState(int ord) {
- assert ord < numThreadStatesActive;
return threadStates[ord];
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Tue May 21 10:28:48 2013
@@ -957,7 +957,7 @@ public class IndexWriter implements Clos
if (doFlush) {
flush(waitForMerges, true);
} else {
- docWriter.abort(); // already closed
+ docWriter.abort(); // already closed -- never sync on IW
}
} finally {
@@ -2006,7 +2006,7 @@ public class IndexWriter implements Clos
bufferedDeletesStream.clear();
docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
- docWriter.abort();
+ docWriter.abort(); // don't sync on IW here
synchronized(this) {
if (pendingCommit != null) {
@@ -2066,7 +2066,13 @@ public class IndexWriter implements Clos
* visible until a {@link #commit()} has been called. This method
* can be rolled back using {@link #rollback()}.</p>
*
- * <p>NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ).</p>
+ * <p>NOTE: this method is much faster than using deleteDocuments( new MatchAllDocsQuery() ).
+ * Yet, this method also has different semantics compared to {@link #deleteDocuments(Query)}
+ * / {@link #deleteDocuments(Query...)} since internal data-structures are cleared as well
+ * as all segment information is forcefully dropped anti-viral semantics like omitting norms
+ * are reset or doc value types are cleared. Essentially a call to {@link #deleteAll()} is equivalent
+ * to creating a new {@link IndexWriter} with {@link OpenMode#CREATE} which a delete query only marks
+ * documents as deleted.</p>
*
* <p>NOTE: this method will forcefully abort all merges
* in progress. If other threads are running {@link
@@ -2074,40 +2080,58 @@ public class IndexWriter implements Clos
* {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s.
*/
- public synchronized void deleteAll() throws IOException {
+ public void deleteAll() throws IOException {
ensureOpen();
+ // Remove any buffered docs
boolean success = false;
- try {
-
- // Abort any running merges
- finishMerges(false);
-
- // Remove any buffered docs
- docWriter.abort();
-
- // Remove all segments
- segmentInfos.clear();
-
- // Ask deleter to locate unreferenced files & remove them:
- deleter.checkpoint(segmentInfos, false);
- deleter.refresh();
-
- globalFieldNumberMap.clear();
-
- // Don't bother saving any changes in our segmentInfos
- readerPool.dropAll(false);
-
- // Mark that the index has changed
- ++changeCount;
- segmentInfos.changed();
- success = true;
- } catch (OutOfMemoryError oom) {
- handleOOM(oom, "deleteAll");
- } finally {
- if (!success) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "hit exception during deleteAll");
+ /* hold the full flush lock to prevent concurrency commits / NRT reopens to
+ * get in our way and do unnecessary work. -- if we don't lock this here we might
+ * get in trouble if */
+ synchronized (fullFlushLock) {
+ /*
+ * We first abort and trash everything we have in-memory
+ * and keep the thread-states locked, the lockAndAbortAll operation
+ * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
+ * of logical happens-before relationship in the DW. So we do
+ * abort all in memory structures
+ * We also drop global field numbering before during abort to make
+ * sure it's just like a fresh index.
+ */
+ try {
+ docWriter.lockAndAbortAll();
+ synchronized (this) {
+ try {
+ // Abort any running merges
+ finishMerges(false);
+ // Remove all segments
+ segmentInfos.clear();
+ // Ask deleter to locate unreferenced files & remove them:
+ deleter.checkpoint(segmentInfos, false);
+ /* don't refresh the deleter here since there might
+ * be concurrent indexing requests coming in opening
+ * files on the directory after we called DW#abort()
+ * if we do so these indexing requests might hit FNF exceptions.
+ * We will remove the files incrementally as we go...
+ */
+ // Don't bother saving any changes in our segmentInfos
+ readerPool.dropAll(false);
+ // Mark that the index has changed
+ ++changeCount;
+ segmentInfos.changed();
+ globalFieldNumberMap.clear();
+ success = true;
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "deleteAll");
+ } finally {
+ if (!success) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "hit exception during deleteAll");
+ }
+ }
+ }
}
+ } finally {
+ docWriter.unlockAllAfterAbortAll();
}
}
}
@@ -2867,6 +2891,11 @@ public class IndexWriter implements Clos
// Ensures only one flush() is actually flushing segments
// at a time:
private final Object fullFlushLock = new Object();
+
+ // for assert
+ boolean holdsFullFlushLock() {
+ return Thread.holdsLock(fullFlushLock);
+ }
/**
* Flush all in-memory buffered updates (adds and deletes)
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1484734&r1=1484733&r2=1484734&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Tue May 21 10:28:48 2013
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -302,6 +304,69 @@ public class TestIndexWriterDelete exten
modifier.close();
dir.close();
}
+
+
+ public void testDeleteAllNoDeadLock() throws IOException, InterruptedException {
+ Directory dir = newDirectory();
+ final RandomIndexWriter modifier = new RandomIndexWriter(random(), dir);
+ int numThreads = atLeast(2);
+ Thread[] threads = new Thread[numThreads];
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ final int offset = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ int id = offset * 1000;
+ int value = 100;
+ try {
+ latch.await();
+ for (int i = 0; i < 1000; i++) {
+ Document doc = new Document();
+ doc.add(newTextField("content", "aaa", Field.Store.NO));
+ doc.add(newStringField("id", String.valueOf(id++), Field.Store.YES));
+ doc.add(newStringField("value", String.valueOf(value), Field.Store.NO));
+ doc.add(new NumericDocValuesField("dv", value));
+ modifier.addDocument(doc);
+ if (VERBOSE) {
+ System.out.println("\tThread["+offset+"]: add doc: " + id);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneLatch.countDown();
+ if (VERBOSE) {
+ System.out.println("\tThread["+offset+"]: done indexing" );
+ }
+ }
+ }
+ };
+ threads[i].start();
+ }
+ latch.countDown();
+ while(!doneLatch.await(1, TimeUnit.MILLISECONDS)) {
+ modifier.deleteAll();
+ if (VERBOSE) {
+ System.out.println("del all");
+ }
+ }
+
+ modifier.deleteAll();
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ modifier.close();
+ DirectoryReader reader = DirectoryReader.open(dir);
+ assertEquals(reader.maxDoc(), 0);
+ assertEquals(reader.numDocs(), 0);
+ assertEquals(reader.numDeletedDocs(), 0);
+ reader.close();
+
+ dir.close();
+ }
// test rollback of deleteAll()
public void testDeleteAllRollback() throws IOException {