You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/06/21 18:28:58 UTC
svn commit: r1352600 - in /lucene/dev/trunk/lucene: ./
core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/
Author: mikemccand
Date: Thu Jun 21 16:28:57 2012
New Revision: 1352600
URL: http://svn.apache.org/viewvc?rev=1352600&view=rev
Log:
LUCENE-4147: fix thread safety issues when IndexWriter's rollback & commit are called simultaneously
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Thu Jun 21 16:28:57 2012
@@ -1062,6 +1062,9 @@ Bug fixes
* LUCENE-4114: Fix int overflow bugs in BYTES_FIXED_STRAIGHT and
BYTES_FIXED_DEREF doc values implementations (Walt Elder via Mike McCandless).
+* LUCENE-4147: Fixed thread safety issues when rollback() and commit()
+ are called simultaneously. (Simon Willnauer, Mike McCandless)
+
Documentation
* LUCENE-3958: Javadocs corrections for IndexWriter.
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Jun 21 16:28:57 2012
@@ -202,7 +202,6 @@ final class DocumentsWriter {
* discarding any docs added since last flush. */
synchronized void abort() throws IOException {
boolean success = false;
-
synchronized (this) {
deleteQueue.clear();
}
@@ -233,6 +232,7 @@ final class DocumentsWriter {
perThread.unlock();
}
}
+ flushControl.waitForFlush();
success = true;
} finally {
if (infoStream.isEnabled("DW")) {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Thu Jun 21 16:28:57 2012
@@ -343,8 +343,10 @@ final class DocumentsWriterFlushControl
synchronized void setClosed() {
// set by DW to signal that we should not release new DWPT after close
- this.closed = true;
- perThreadPool.deactivateUnreleasedStates();
+ if (!closed) {
+ this.closed = true;
+ perThreadPool.deactivateUnreleasedStates();
+ }
}
/**
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Thu Jun 21 16:28:57 2012
@@ -463,10 +463,6 @@ class DocumentsWriterPerThread {
pendingDeletes.docIDs.clear();
}
- if (infoStream.isEnabled("DWPT")) {
- infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
- }
-
if (aborting) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: skip because aborting is set");
@@ -474,6 +470,10 @@ class DocumentsWriterPerThread {
return null;
}
+ if (infoStream.isEnabled("DWPT")) {
+ infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
+ }
+
boolean success = false;
try {
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Thu Jun 21 16:28:57 2012
@@ -841,15 +841,19 @@ public class IndexWriter implements Clos
*/
public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
- // Ensure that only one thread actually gets to do the closing:
- if (shouldClose()) {
- // If any methods have hit OutOfMemoryError, then abort
- // on close, in case the internal state of IndexWriter
- // or DocumentsWriter is corrupt
- if (hitOOM)
- rollbackInternal();
- else
- closeInternal(waitForMerges);
+ // Ensure that only one thread actually gets to do the
+ // closing, and make sure no commit is also in progress:
+ synchronized(commitLock) {
+ if (shouldClose()) {
+ // If any methods have hit OutOfMemoryError, then abort
+ // on close, in case the internal state of IndexWriter
+ // or DocumentsWriter is corrupt
+ if (hitOOM) {
+ rollbackInternal();
+ } else {
+ closeInternal(waitForMerges, !hitOOM);
+ }
+ }
}
}
@@ -868,12 +872,13 @@ public class IndexWriter implements Clos
// successfully) or another (fails to close)
doWait();
}
- } else
+ } else {
return false;
+ }
}
}
- private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
+ private void closeInternal(boolean waitForMerges, boolean doFlush) throws CorruptIndexException, IOException {
try {
@@ -889,8 +894,10 @@ public class IndexWriter implements Clos
// Only allow a new merge to be triggered if we are
// going to wait for merges:
- if (!hitOOM) {
+ if (doFlush) {
flush(waitForMerges, true);
+ } else {
+ docWriter.abort(); // already closed
}
if (waitForMerges)
@@ -910,7 +917,7 @@ public class IndexWriter implements Clos
infoStream.message("IW", "now call final commit()");
}
- if (!hitOOM) {
+ if (doFlush) {
commitInternal(null);
}
@@ -1774,9 +1781,13 @@ public class IndexWriter implements Clos
public void rollback() throws IOException {
ensureOpen();
- // Ensure that only one thread actually gets to do the closing:
- if (shouldClose())
- rollbackInternal();
+ // Ensure that only one thread actually gets to do the
+ // closing, and make sure no commit is also in progress:
+ synchronized(commitLock) {
+ if (shouldClose()) {
+ rollbackInternal();
+ }
+ }
}
private void rollbackInternal() throws IOException {
@@ -1786,6 +1797,7 @@ public class IndexWriter implements Clos
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "rollback");
}
+
try {
synchronized(this) {
@@ -1804,7 +1816,8 @@ public class IndexWriter implements Clos
mergeScheduler.close();
bufferedDeletesStream.clear();
-
+ docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes
+ docWriter.abort();
synchronized(this) {
if (pendingCommit != null) {
@@ -1826,8 +1839,7 @@ public class IndexWriter implements Clos
if (infoStream.isEnabled("IW") ) {
infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
}
-
- docWriter.abort();
+
assert testPoint("rollback before checkpoint");
@@ -1854,7 +1866,7 @@ public class IndexWriter implements Clos
}
}
- closeInternal(false);
+ closeInternal(false, false);
}
/**
@@ -2482,99 +2494,102 @@ public class IndexWriter implements Clos
public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
ensureOpen(false);
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "prepareCommit: flush");
- infoStream.message("IW", " index before flush " + segString());
- }
+ synchronized(commitLock) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "prepareCommit: flush");
+ infoStream.message("IW", " index before flush " + segString());
+ }
- if (hitOOM) {
- throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
- }
+ if (hitOOM) {
+ throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
+ }
- if (pendingCommit != null) {
- throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
- }
+ if (pendingCommit != null) {
+ throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+ }
- doBeforeFlush();
- assert testPoint("startDoFlush");
- SegmentInfos toCommit = null;
- boolean anySegmentsFlushed = false;
+ doBeforeFlush();
+ assert testPoint("startDoFlush");
+ SegmentInfos toCommit = null;
+ boolean anySegmentsFlushed = false;
- // This is copied from doFlush, except it's modified to
- // clone & incRef the flushed SegmentInfos inside the
- // sync block:
+ // This is copied from doFlush, except it's modified to
+ // clone & incRef the flushed SegmentInfos inside the
+ // sync block:
- try {
+ try {
- synchronized (fullFlushLock) {
- boolean flushSuccess = false;
- boolean success = false;
- try {
- anySegmentsFlushed = docWriter.flushAllThreads();
- if (!anySegmentsFlushed) {
- // prevent double increment since docWriter#doFlush increments the flushcount
- // if we flushed anything.
- flushCount.incrementAndGet();
- }
- flushSuccess = true;
+ synchronized (fullFlushLock) {
+ boolean flushSuccess = false;
+ boolean success = false;
+ try {
+ anySegmentsFlushed = docWriter.flushAllThreads();
+ if (!anySegmentsFlushed) {
+ // prevent double increment since docWriter#doFlush increments the flushcount
+ // if we flushed anything.
+ flushCount.incrementAndGet();
+ }
+ flushSuccess = true;
- synchronized(this) {
- maybeApplyDeletes(true);
+ synchronized(this) {
+ maybeApplyDeletes(true);
- readerPool.commit(segmentInfos);
+ readerPool.commit(segmentInfos);
- // Must clone the segmentInfos while we still
- // hold fullFlushLock and while sync'd so that
- // no partial changes (eg a delete w/o
- // corresponding add from an updateDocument) can
- // sneak into the commit point:
- toCommit = segmentInfos.clone();
-
- pendingCommitChangeCount = changeCount;
-
- // This protects the segmentInfos we are now going
- // to commit. This is important in case, eg, while
- // we are trying to sync all referenced files, a
- // merge completes which would otherwise have
- // removed the files we are now syncing.
- filesToCommit = toCommit.files(directory, false);
- deleter.incRef(filesToCommit);
- }
- success = true;
- } finally {
- if (!success) {
- if (infoStream.isEnabled("IW")) {
- infoStream.message("IW", "hit exception during prepareCommit");
+ // Must clone the segmentInfos while we still
+ // hold fullFlushLock and while sync'd so that
+ // no partial changes (eg a delete w/o
+ // corresponding add from an updateDocument) can
+ // sneak into the commit point:
+ toCommit = segmentInfos.clone();
+
+ pendingCommitChangeCount = changeCount;
+
+ // This protects the segmentInfos we are now going
+ // to commit. This is important in case, eg, while
+ // we are trying to sync all referenced files, a
+ // merge completes which would otherwise have
+ // removed the files we are now syncing.
+ filesToCommit = toCommit.files(directory, false);
+ deleter.incRef(filesToCommit);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ if (infoStream.isEnabled("IW")) {
+ infoStream.message("IW", "hit exception during prepareCommit");
+ }
}
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(flushSuccess);
+ doAfterFlush();
}
- // Done: finish the full flush!
- docWriter.finishFullFlush(flushSuccess);
- doAfterFlush();
}
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "prepareCommit");
}
- } catch (OutOfMemoryError oom) {
- handleOOM(oom, "prepareCommit");
- }
- boolean success = false;
- try {
- if (anySegmentsFlushed) {
- maybeMerge();
- }
- success = true;
- } finally {
- if (!success) {
- synchronized (this) {
- deleter.decRef(filesToCommit);
- filesToCommit = null;
+ boolean success = false;
+ try {
+ if (anySegmentsFlushed) {
+ maybeMerge();
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ synchronized (this) {
+ deleter.decRef(filesToCommit);
+ filesToCommit = null;
+ }
}
}
- }
- startCommit(toCommit, commitUserData);
+ startCommit(toCommit, commitUserData);
+ }
}
- // Used only by commit, below; lock order is commitLock -> IW
+ // Used only by commit and prepareCommit, below; lock
+ // order is commitLock -> IW
private final Object commitLock = new Object();
/**
@@ -2634,6 +2649,8 @@ public class IndexWriter implements Clos
}
synchronized(commitLock) {
+ ensureOpen(false);
+
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: enter lock");
}
Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Jun 21 16:28:57 2012
@@ -19,6 +19,10 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@@ -31,6 +35,7 @@ import org.apache.lucene.store.Directory
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util._TestUtil;
@@ -456,41 +461,132 @@ public class TestIndexWriterWithThreads
dir.close();
}
- static class DelayedIndexAndCloseRunnable extends Thread {
- private final Directory dir;
- boolean failed = false;
- Throwable failure = null;
- private final CountDownLatch startIndexing = new CountDownLatch(1);
- private CountDownLatch iwConstructed;
-
- public DelayedIndexAndCloseRunnable(Directory dir,
- CountDownLatch iwConstructed) {
- this.dir = dir;
- this.iwConstructed = iwConstructed;
- }
-
- public void startIndexing() {
- this.startIndexing.countDown();
- }
-
- @Override
- public void run() {
- try {
- Document doc = new Document();
- Field field = newTextField("field", "testData", Field.Store.YES);
- doc.add(field);
- IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
- TEST_VERSION_CURRENT, new MockAnalyzer(random())));
- iwConstructed.countDown();
- startIndexing.await();
- writer.addDocument(doc);
- writer.close();
- } catch (Throwable e) {
- failed = true;
- failure = e;
- failure.printStackTrace(System.out);
- return;
- }
- }
- }
+ static class DelayedIndexAndCloseRunnable extends Thread {
+ private final Directory dir;
+ boolean failed = false;
+ Throwable failure = null;
+ private final CountDownLatch startIndexing = new CountDownLatch(1);
+ private CountDownLatch iwConstructed;
+
+ public DelayedIndexAndCloseRunnable(Directory dir,
+ CountDownLatch iwConstructed) {
+ this.dir = dir;
+ this.iwConstructed = iwConstructed;
+ }
+
+ public void startIndexing() {
+ this.startIndexing.countDown();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Document doc = new Document();
+ Field field = newTextField("field", "testData", Field.Store.YES);
+ doc.add(field);
+ IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
+ TEST_VERSION_CURRENT, new MockAnalyzer(random())));
+ iwConstructed.countDown();
+ startIndexing.await();
+ writer.addDocument(doc);
+ writer.close();
+ } catch (Throwable e) {
+ failed = true;
+ failure = e;
+ failure.printStackTrace(System.out);
+ return;
+ }
+ }
+ }
+
+ // LUCENE-4147
+ public void testRollbackAndCommitWithThreads() throws Exception {
+ final MockDirectoryWrapper d = newFSDirectory(_TestUtil.getTempDir("RollbackAndCommitWithThreads"));
+ d.setPreventDoubleWrite(false);
+
+ final int threadCount = _TestUtil.nextInt(random(), 2, 6);
+
+ final AtomicReference<IndexWriter> writerRef = new AtomicReference<IndexWriter>();
+ writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
+ final LineFileDocs docs = new LineFileDocs(random());
+ final Thread[] threads = new Thread[threadCount];
+ final int iters = atLeast(1000);
+ final AtomicBoolean failed = new AtomicBoolean();
+ final Lock rollbackLock = new ReentrantLock();
+ final Lock commitLock = new ReentrantLock();
+ for(int threadID=0;threadID<threadCount;threadID++) {
+ threads[threadID] = new Thread() {
+ @Override
+ public void run() {
+ for(int iter=0;iter<iters && !failed.get();iter++) {
+ //final int x = random().nextInt(5);
+ final int x = random().nextInt(3);
+ try {
+ switch(x) {
+ case 0:
+ rollbackLock.lock();
+ if (VERBOSE) {
+ System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now rollback");
+ }
+ try {
+ writerRef.get().rollback();
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": rollback done; now open new writer");
+ }
+ writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
+ } finally {
+ rollbackLock.unlock();
+ }
+ break;
+ case 1:
+ commitLock.lock();
+ if (VERBOSE) {
+ System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now commit");
+ }
+ try {
+ if (random().nextBoolean()) {
+ writerRef.get().prepareCommit();
+ }
+ writerRef.get().commit();
+ } catch (AlreadyClosedException ace) {
+ // ok
+ } catch (NullPointerException npe) {
+ // ok
+ } finally {
+ commitLock.unlock();
+ }
+ break;
+ case 2:
+ if (VERBOSE) {
+ System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now add");
+ }
+ try {
+ writerRef.get().addDocument(docs.nextDoc());
+ } catch (AlreadyClosedException ace) {
+ // ok
+ } catch (NullPointerException npe) {
+ // ok
+ } catch (AssertionError ae) {
+ // ok
+ }
+ break;
+ }
+ } catch (Throwable t) {
+ failed.set(true);
+ throw new RuntimeException(t);
+ }
+ }
+ }
+ };
+ threads[threadID].start();
+ }
+
+ for(int threadID=0;threadID<threadCount;threadID++) {
+ threads[threadID].join();
+ }
+
+ assertTrue(!failed.get());
+ writerRef.get().close();
+ d.close();
+ }
}