You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/02/25 10:49:48 UTC
svn commit: r1662195 - in /lucene/dev/branches/lucene_solr_4_10/lucene: ./
core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
test-framework/src/java/org/apache/lucene/index/
Author: mikemccand
Date: Wed Feb 25 09:49:47 2015
New Revision: 1662195
URL: http://svn.apache.org/r1662195
Log:
LUCENE-6214: don't deadlock when tragedy strikes during getReader and another thread is committing
Added:
lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java (with props)
lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java (with props)
Modified:
lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt
lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Modified: lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt Wed Feb 25 09:49:47 2015
@@ -39,6 +39,11 @@ Bug fixes
FileNotFoundException when writing doc values updates at the same
time that a merge kicks off. (Mike McCandless)
+* LUCENE-6214: Fixed IndexWriter deadlock when one thread is
+ committing while another opens a near-real-time reader and an
+ unrecoverable (tragic) exception is hit. (Simon Willnauer, Mike
+ McCandless)
+
API Changes
* LUCENE-6212: Deprecate IndexWriter APIs that accept per-document Analyzer.
Modified: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Wed Feb 25 09:49:47 2015
@@ -639,7 +639,8 @@ final class DocumentsWriter implements C
return anythingFlushed;
}
- final void finishFullFlush(boolean success) {
+ final void finishFullFlush(IndexWriter indexWriter, boolean success) {
+ assert indexWriter.holdsFullFlushLock();
try {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);
Modified: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Wed Feb 25 09:49:47 2015
@@ -404,8 +404,8 @@ public class IndexWriter implements Clos
*/
boolean success2 = false;
try {
+ boolean success = false;
synchronized (fullFlushLock) {
- boolean success = false;
try {
anySegmentFlushed = docWriter.flushAllThreads(this);
if (!anySegmentFlushed) {
@@ -413,7 +413,6 @@ public class IndexWriter implements Clos
// if we flushed anything.
flushCount.incrementAndGet();
}
- success = true;
// Prevent segmentInfos from changing while opening the
// reader; in theory we could instead do similar retry logic,
// just like we do when loading segments_N
@@ -424,22 +423,18 @@ public class IndexWriter implements Clos
infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
}
}
- } catch (OutOfMemoryError oom) {
- tragicEvent(oom, "getReader");
- // never reached but javac disagrees:
- return null;
+ success = true;
} finally {
- if (!success) {
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(this, success);
+ if (success) {
+ processEvents(false, true);
+ doAfterFlush();
+ } else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during NRT reader");
}
}
- if (tragedy == null) {
- // Done: finish the full flush! (unless we hit OOM or something)
- docWriter.finishFullFlush(success);
- processEvents(false, true);
- doAfterFlush();
- }
}
}
if (anySegmentFlushed) {
@@ -449,6 +444,10 @@ public class IndexWriter implements Clos
infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
}
success2 = true;
+ } catch (OutOfMemoryError tragedy) {
+ tragicEvent(tragedy, "getReader");
+ // never reached but javac disagrees:
+ return null;
} finally {
if (!success2) {
IOUtils.closeWhileHandlingException(r);
@@ -630,6 +629,9 @@ public class IndexWriter implements Clos
*/
public synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) {
+ // Make sure no new readers can be opened if another thread just closed us:
+ ensureOpen(false);
+
assert info.info.dir == directory: "info.dir=" + info.info.dir + " vs " + directory;
ReadersAndUpdates rld = readerMap.get(info);
@@ -2228,17 +2230,17 @@ public class IndexWriter implements Clos
/* hold the full flush lock to prevent concurrency commits / NRT reopens to
* get in our way and do unnecessary work. -- if we don't lock this here we might
* get in trouble if */
- synchronized (fullFlushLock) {
- /*
- * We first abort and trash everything we have in-memory
- * and keep the thread-states locked, the lockAndAbortAll operation
- * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
- * of logical happens-before relationship in the DW. So we do
- * abort all in memory structures
- * We also drop global field numbering before during abort to make
- * sure it's just like a fresh index.
- */
- try {
+ /*
+ * We first abort and trash everything we have in-memory
+ * and keep the thread-states locked, the lockAndAbortAll operation
+ * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
+ * of logical happens-before relationship in the DW. So we do
+ * abort all in memory structures
+ * We also drop global field numbering before during abort to make
+ * sure it's just like a fresh index.
+ */
+ try {
+ synchronized (fullFlushLock) {
docWriter.lockAndAbortAll(this);
processEvents(false, true);
synchronized (this) {
@@ -2263,6 +2265,7 @@ public class IndexWriter implements Clos
globalFieldNumberMap.clear();
success = true;
} finally {
+ docWriter.unlockAllAfterAbortAll(this);
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during deleteAll");
@@ -2270,11 +2273,9 @@ public class IndexWriter implements Clos
}
}
}
- } catch (OutOfMemoryError oom) {
- tragicEvent(oom, "deleteAll");
- } finally {
- docWriter.unlockAllAfterAbortAll(this);
}
+ } catch (OutOfMemoryError oom) {
+ tragicEvent(oom, "deleteAll");
}
}
@@ -2997,7 +2998,7 @@ public class IndexWriter implements Clos
}
}
// Done: finish the full flush!
- docWriter.finishFullFlush(flushSuccess);
+ docWriter.finishFullFlush(this, flushSuccess);
doAfterFlush();
}
}
@@ -3238,27 +3239,31 @@ public class IndexWriter implements Clos
infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
infoStream.message("IW", " index before flush " + segString());
}
- final boolean anySegmentFlushed;
+ final boolean anyChanges;
synchronized (fullFlushLock) {
- boolean flushSuccess = false;
+ boolean flushSuccess = false;
try {
- anySegmentFlushed = docWriter.flushAllThreads(this);
+ anyChanges = docWriter.flushAllThreads(this);
+ if (!anyChanges) {
+ // flushCount is incremented in flushAllThreads
+ flushCount.incrementAndGet();
+ }
flushSuccess = true;
} finally {
- docWriter.finishFullFlush(flushSuccess);
+ docWriter.finishFullFlush(this, flushSuccess);
processEvents(false, true);
}
}
synchronized(this) {
maybeApplyDeletes(applyAllDeletes);
doAfterFlush();
- if (!anySegmentFlushed) {
+ if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
success = true;
- return anySegmentFlushed;
+ return anyChanges;
}
} catch (OutOfMemoryError oom) {
tragicEvent(oom, "doFlush");
Added: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java?rev=1662195&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java Wed Feb 25 09:49:47 2015
@@ -0,0 +1,104 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
+
+ public void testDeadlockExcNRTReaderCommit() throws Exception {
+ MockDirectoryWrapper dir = newMockDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+ if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
+ iwc.setMergeScheduler(new SuppressingConcurrentMergeScheduler() {
+ @Override
+ protected boolean isOK(Throwable th) {
+ return true;
+ }
+ });
+ }
+ final IndexWriter w = new IndexWriter(dir, iwc);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ final AtomicBoolean done = new AtomicBoolean();
+ Thread commitThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ while (done.get() == false) {
+ w.addDocument(new Document());
+ w.commit();
+ }
+ } catch (Throwable t) {
+ done.set(true);
+ //System.out.println("commit exc:");
+ //t.printStackTrace(System.out);
+ }
+ }
+ };
+ commitThread.start();
+ final DirectoryReader r0 = DirectoryReader.open(w, true);
+ final Thread nrtThread = new Thread() {
+ @Override
+ public void run() {
+ DirectoryReader r = r0;
+ try {
+ try {
+ startingGun.await();
+ while (done.get() == false) {
+ DirectoryReader oldReader = r;
+ DirectoryReader r2 = DirectoryReader.openIfChanged(oldReader);
+ if (r2 != null) {
+ r = r2;
+ oldReader.decRef();
+ }
+ }
+ } finally {
+ r.close();
+ }
+ } catch (Throwable t) {
+ done.set(true);
+ //System.out.println("nrt exc:");
+ //t.printStackTrace(System.out);
+ }
+ }
+ };
+ nrtThread.start();
+ final AtomicBoolean keepFailing = new AtomicBoolean(true);
+ dir.failOn(new MockDirectoryWrapper.Failure() {
+ @Override
+ public void eval(MockDirectoryWrapper dir) {
+ if (Thread.currentThread() == nrtThread && keepFailing.get() && random().nextDouble() < 0.1) {
+ throw new OutOfMemoryError("fake OOME");
+ }
+ }
+ });
+ startingGun.countDown();
+ commitThread.join();
+ nrtThread.join();
+ keepFailing.set(false);
+ w.close();
+ dir.close();
+ }
+}
Added: lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java?rev=1662195&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java Wed Feb 25 09:49:47 2015
@@ -0,0 +1,38 @@
+package org.apache.lucene.index;
+
+import org.apache.lucene.store.Directory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */
+public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
+ @Override
+ protected void handleMergeException(Throwable exc) {
+ while (true) {
+ if (isOK(exc)) {
+ return;
+ }
+ exc = exc.getCause();
+ if (exc == null) {
+ super.handleMergeException(exc);
+ }
+ }
+ }
+
+ protected abstract boolean isOK(Throwable t);
+}