You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/06/21 18:28:58 UTC

svn commit: r1352600 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/

Author: mikemccand
Date: Thu Jun 21 16:28:57 2012
New Revision: 1352600

URL: http://svn.apache.org/viewvc?rev=1352600&view=rev
Log:
LUCENE-4147: fix thread safety issues when IndexWriter's rollback & commit are called simultaneously

Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Thu Jun 21 16:28:57 2012
@@ -1062,6 +1062,9 @@ Bug fixes
 * LUCENE-4114: Fix int overflow bugs in BYTES_FIXED_STRAIGHT and
   BYTES_FIXED_DEREF doc values implementations (Walt Elder via Mike McCandless).
 
+* LUCENE-4147: Fixed thread safety issues when rollback() and commit()
+  are called simultaneously.  (Simon Willnauer, Mike McCandless)
+
 Documentation
 
 * LUCENE-3958: Javadocs corrections for IndexWriter.

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Thu Jun 21 16:28:57 2012
@@ -202,7 +202,6 @@ final class DocumentsWriter {
    *  discarding any docs added since last flush. */
   synchronized void abort() throws IOException {
     boolean success = false;
-
     synchronized (this) {
       deleteQueue.clear();
     }
@@ -233,6 +232,7 @@ final class DocumentsWriter {
           perThread.unlock();
         }
       }
+      flushControl.waitForFlush();
       success = true;
     } finally {
       if (infoStream.isEnabled("DW")) {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Thu Jun 21 16:28:57 2012
@@ -343,8 +343,10 @@ final class DocumentsWriterFlushControl 
 
   synchronized void setClosed() {
     // set by DW to signal that we should not release new DWPT after close
-    this.closed = true;
-    perThreadPool.deactivateUnreleasedStates();
+    if (!closed) {
+      this.closed = true;
+      perThreadPool.deactivateUnreleasedStates();
+    }
   }
 
   /**

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Thu Jun 21 16:28:57 2012
@@ -463,10 +463,6 @@ class DocumentsWriterPerThread {
       pendingDeletes.docIDs.clear();
     }
 
-    if (infoStream.isEnabled("DWPT")) {
-      infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
-    }
-
     if (aborting) {
       if (infoStream.isEnabled("DWPT")) {
         infoStream.message("DWPT", "flush: skip because aborting is set");
@@ -474,6 +470,10 @@ class DocumentsWriterPerThread {
       return null;
     }
 
+    if (infoStream.isEnabled("DWPT")) {
+      infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
+    }
+
     boolean success = false;
 
     try {

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Thu Jun 21 16:28:57 2012
@@ -841,15 +841,19 @@ public class IndexWriter implements Clos
    */
   public void close(boolean waitForMerges) throws CorruptIndexException, IOException {
 
-    // Ensure that only one thread actually gets to do the closing:
-    if (shouldClose()) {
-      // If any methods have hit OutOfMemoryError, then abort
-      // on close, in case the internal state of IndexWriter
-      // or DocumentsWriter is corrupt
-      if (hitOOM)
-        rollbackInternal();
-      else
-        closeInternal(waitForMerges);
+    // Ensure that only one thread actually gets to do the
+    // closing, and make sure no commit is also in progress:
+    synchronized(commitLock) {
+      if (shouldClose()) {
+        // If any methods have hit OutOfMemoryError, then abort
+        // on close, in case the internal state of IndexWriter
+        // or DocumentsWriter is corrupt
+        if (hitOOM) {
+          rollbackInternal();
+        } else {
+          closeInternal(waitForMerges, !hitOOM);
+        }
+      }
     }
   }
 
@@ -868,12 +872,13 @@ public class IndexWriter implements Clos
           // successfully) or another (fails to close)
           doWait();
         }
-      } else
+      } else {
         return false;
+      }
     }
   }
 
-  private void closeInternal(boolean waitForMerges) throws CorruptIndexException, IOException {
+  private void closeInternal(boolean waitForMerges, boolean doFlush) throws CorruptIndexException, IOException {
 
     try {
 
@@ -889,8 +894,10 @@ public class IndexWriter implements Clos
 
       // Only allow a new merge to be triggered if we are
       // going to wait for merges:
-      if (!hitOOM) {
+      if (doFlush) {
         flush(waitForMerges, true);
+      } else {
+        docWriter.abort(); // already closed
       }
 
       if (waitForMerges)
@@ -910,7 +917,7 @@ public class IndexWriter implements Clos
         infoStream.message("IW", "now call final commit()");
       }
 
-      if (!hitOOM) {
+      if (doFlush) {
         commitInternal(null);
       }
 
@@ -1774,9 +1781,13 @@ public class IndexWriter implements Clos
   public void rollback() throws IOException {
     ensureOpen();
 
-    // Ensure that only one thread actually gets to do the closing:
-    if (shouldClose())
-      rollbackInternal();
+    // Ensure that only one thread actually gets to do the
+    // closing, and make sure no commit is also in progress:
+    synchronized(commitLock) {
+      if (shouldClose()) {
+        rollbackInternal();
+      }
+    }
   }
 
   private void rollbackInternal() throws IOException {
@@ -1786,6 +1797,7 @@ public class IndexWriter implements Clos
     if (infoStream.isEnabled("IW")) {
       infoStream.message("IW", "rollback");
     }
+    
 
     try {
       synchronized(this) {
@@ -1804,7 +1816,8 @@ public class IndexWriter implements Clos
       mergeScheduler.close();
 
       bufferedDeletesStream.clear();
-
+      docWriter.close(); // mark it as closed first to prevent subsequent indexing actions/flushes 
+      docWriter.abort();
       synchronized(this) {
 
         if (pendingCommit != null) {
@@ -1826,8 +1839,7 @@ public class IndexWriter implements Clos
         if (infoStream.isEnabled("IW") ) {
           infoStream.message("IW", "rollback: infos=" + segString(segmentInfos));
         }
-
-        docWriter.abort();
+        
 
         assert testPoint("rollback before checkpoint");
 
@@ -1854,7 +1866,7 @@ public class IndexWriter implements Clos
       }
     }
 
-    closeInternal(false);
+    closeInternal(false, false);
   }
 
   /**
@@ -2482,99 +2494,102 @@ public class IndexWriter implements Clos
   public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
     ensureOpen(false);
 
-    if (infoStream.isEnabled("IW")) {
-      infoStream.message("IW", "prepareCommit: flush");
-      infoStream.message("IW", "  index before flush " + segString());
-    }
+    synchronized(commitLock) {
+      if (infoStream.isEnabled("IW")) {
+        infoStream.message("IW", "prepareCommit: flush");
+        infoStream.message("IW", "  index before flush " + segString());
+      }
 
-    if (hitOOM) {
-      throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
-    }
+      if (hitOOM) {
+        throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
+      }
 
-    if (pendingCommit != null) {
-      throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
-    }
+      if (pendingCommit != null) {
+        throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+      }
 
-    doBeforeFlush();
-    assert testPoint("startDoFlush");
-    SegmentInfos toCommit = null;
-    boolean anySegmentsFlushed = false;
+      doBeforeFlush();
+      assert testPoint("startDoFlush");
+      SegmentInfos toCommit = null;
+      boolean anySegmentsFlushed = false;
 
-    // This is copied from doFlush, except it's modified to
-    // clone & incRef the flushed SegmentInfos inside the
-    // sync block:
+      // This is copied from doFlush, except it's modified to
+      // clone & incRef the flushed SegmentInfos inside the
+      // sync block:
 
-    try {
+      try {
 
-      synchronized (fullFlushLock) {
-        boolean flushSuccess = false;
-        boolean success = false;
-        try {
-          anySegmentsFlushed = docWriter.flushAllThreads();
-          if (!anySegmentsFlushed) {
-            // prevent double increment since docWriter#doFlush increments the flushcount
-            // if we flushed anything.
-            flushCount.incrementAndGet();
-          }
-          flushSuccess = true;
+        synchronized (fullFlushLock) {
+          boolean flushSuccess = false;
+          boolean success = false;
+          try {
+            anySegmentsFlushed = docWriter.flushAllThreads();
+            if (!anySegmentsFlushed) {
+              // prevent double increment since docWriter#doFlush increments the flushcount
+              // if we flushed anything.
+              flushCount.incrementAndGet();
+            }
+            flushSuccess = true;
 
-          synchronized(this) {
-            maybeApplyDeletes(true);
+            synchronized(this) {
+              maybeApplyDeletes(true);
 
-            readerPool.commit(segmentInfos);
+              readerPool.commit(segmentInfos);
 
-            // Must clone the segmentInfos while we still
-            // hold fullFlushLock and while sync'd so that
-            // no partial changes (eg a delete w/o
-            // corresponding add from an updateDocument) can
-            // sneak into the commit point:
-            toCommit = segmentInfos.clone();
-
-            pendingCommitChangeCount = changeCount;
-
-            // This protects the segmentInfos we are now going
-            // to commit.  This is important in case, eg, while
-            // we are trying to sync all referenced files, a
-            // merge completes which would otherwise have
-            // removed the files we are now syncing.    
-            filesToCommit = toCommit.files(directory, false);
-            deleter.incRef(filesToCommit);
-          }
-          success = true;
-        } finally {
-          if (!success) {
-            if (infoStream.isEnabled("IW")) {
-              infoStream.message("IW", "hit exception during prepareCommit");
+              // Must clone the segmentInfos while we still
+              // hold fullFlushLock and while sync'd so that
+              // no partial changes (eg a delete w/o
+              // corresponding add from an updateDocument) can
+              // sneak into the commit point:
+              toCommit = segmentInfos.clone();
+
+              pendingCommitChangeCount = changeCount;
+
+              // This protects the segmentInfos we are now going
+              // to commit.  This is important in case, eg, while
+              // we are trying to sync all referenced files, a
+              // merge completes which would otherwise have
+              // removed the files we are now syncing.    
+              filesToCommit = toCommit.files(directory, false);
+              deleter.incRef(filesToCommit);
+            }
+            success = true;
+          } finally {
+            if (!success) {
+              if (infoStream.isEnabled("IW")) {
+                infoStream.message("IW", "hit exception during prepareCommit");
+              }
             }
+            // Done: finish the full flush!
+            docWriter.finishFullFlush(flushSuccess);
+            doAfterFlush();
           }
-          // Done: finish the full flush!
-          docWriter.finishFullFlush(flushSuccess);
-          doAfterFlush();
         }
+      } catch (OutOfMemoryError oom) {
+        handleOOM(oom, "prepareCommit");
       }
-    } catch (OutOfMemoryError oom) {
-      handleOOM(oom, "prepareCommit");
-    }
  
-    boolean success = false;
-    try {
-      if (anySegmentsFlushed) {
-        maybeMerge();
-      }
-      success = true;
-    } finally {
-      if (!success) {
-        synchronized (this) {
-          deleter.decRef(filesToCommit);
-          filesToCommit = null;
+      boolean success = false;
+      try {
+        if (anySegmentsFlushed) {
+          maybeMerge();
+        }
+        success = true;
+      } finally {
+        if (!success) {
+          synchronized (this) {
+            deleter.decRef(filesToCommit);
+            filesToCommit = null;
+          }
         }
       }
-    }
 
-    startCommit(toCommit, commitUserData);
+      startCommit(toCommit, commitUserData);
+    }
   }
 
-  // Used only by commit, below; lock order is commitLock -> IW
+  // Used only by commit and prepareCommit, below; lock
+  // order is commitLock -> IW
   private final Object commitLock = new Object();
 
   /**
@@ -2634,6 +2649,8 @@ public class IndexWriter implements Clos
     }
 
     synchronized(commitLock) {
+      ensureOpen(false);
+
       if (infoStream.isEnabled("IW")) {
         infoStream.message("IW", "commit: enter lock");
       }

Modified: lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java?rev=1352600&r1=1352599&r2=1352600&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java (original)
+++ lucene/dev/trunk/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java Thu Jun 21 16:28:57 2012
@@ -19,6 +19,10 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
@@ -31,6 +35,7 @@ import org.apache.lucene.store.Directory
 import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LineFileDocs;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.ThreadInterruptedException;
 import org.apache.lucene.util._TestUtil;
@@ -456,41 +461,132 @@ public class TestIndexWriterWithThreads 
      dir.close();
   }
   
-   static class DelayedIndexAndCloseRunnable extends Thread {
-     private final Directory dir;
-     boolean failed = false;
-     Throwable failure = null;
-     private final CountDownLatch startIndexing = new CountDownLatch(1);
-     private CountDownLatch iwConstructed;
-
-     public DelayedIndexAndCloseRunnable(Directory dir,
-         CountDownLatch iwConstructed) {
-       this.dir = dir;
-       this.iwConstructed = iwConstructed;
-     }
-
-     public void startIndexing() {
-       this.startIndexing.countDown();
-     }
-
-     @Override
-     public void run() {
-       try {
-         Document doc = new Document();
-         Field field = newTextField("field", "testData", Field.Store.YES);
-         doc.add(field);
-         IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
-             TEST_VERSION_CURRENT, new MockAnalyzer(random())));
-         iwConstructed.countDown();
-         startIndexing.await();
-         writer.addDocument(doc);
-         writer.close();
-       } catch (Throwable e) {
-         failed = true;
-         failure = e;
-         failure.printStackTrace(System.out);
-         return;
-       }
-     }
-   }
+  static class DelayedIndexAndCloseRunnable extends Thread {
+    private final Directory dir;
+    boolean failed = false;
+    Throwable failure = null;
+    private final CountDownLatch startIndexing = new CountDownLatch(1);
+    private CountDownLatch iwConstructed;
+
+    public DelayedIndexAndCloseRunnable(Directory dir,
+                                        CountDownLatch iwConstructed) {
+      this.dir = dir;
+      this.iwConstructed = iwConstructed;
+    }
+
+    public void startIndexing() {
+      this.startIndexing.countDown();
+    }
+
+    @Override
+    public void run() {
+      try {
+        Document doc = new Document();
+        Field field = newTextField("field", "testData", Field.Store.YES);
+        doc.add(field);
+        IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(
+                                                                       TEST_VERSION_CURRENT, new MockAnalyzer(random())));
+        iwConstructed.countDown();
+        startIndexing.await();
+        writer.addDocument(doc);
+        writer.close();
+      } catch (Throwable e) {
+        failed = true;
+        failure = e;
+        failure.printStackTrace(System.out);
+        return;
+      }
+    }
+  }
+
+  // LUCENE-4147
+  public void testRollbackAndCommitWithThreads() throws Exception {
+    final MockDirectoryWrapper d = newFSDirectory(_TestUtil.getTempDir("RollbackAndCommitWithThreads"));
+    d.setPreventDoubleWrite(false);
+
+    final int threadCount = _TestUtil.nextInt(random(), 2, 6);
+
+    final AtomicReference<IndexWriter> writerRef = new AtomicReference<IndexWriter>();
+    writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
+    final LineFileDocs docs = new LineFileDocs(random());
+    final Thread[] threads = new Thread[threadCount];
+    final int iters = atLeast(1000);
+    final AtomicBoolean failed = new AtomicBoolean();
+    final Lock rollbackLock = new ReentrantLock();
+    final Lock commitLock = new ReentrantLock();
+    for(int threadID=0;threadID<threadCount;threadID++) {
+      threads[threadID] = new Thread() {
+          @Override
+          public void run() {
+            for(int iter=0;iter<iters && !failed.get();iter++) {
+              //final int x = random().nextInt(5);
+              final int x = random().nextInt(3);
+              try {
+                switch(x) {
+                case 0:
+                  rollbackLock.lock();
+                  if (VERBOSE) {
+                    System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now rollback");
+                  }
+                  try {
+                    writerRef.get().rollback();
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": rollback done; now open new writer");
+                    }
+                    writerRef.set(new IndexWriter(d, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()))));
+                  } finally {
+                    rollbackLock.unlock();
+                  }
+                  break;
+                case 1:
+                  commitLock.lock();
+                  if (VERBOSE) {
+                    System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now commit");
+                  }
+                  try {
+                    if (random().nextBoolean()) {
+                      writerRef.get().prepareCommit();
+                    }
+                    writerRef.get().commit();
+                  } catch (AlreadyClosedException ace) {
+                    // ok
+                  } catch (NullPointerException npe) {
+                    // ok
+                  } finally {
+                    commitLock.unlock();
+                  }
+                  break;
+                case 2:
+                  if (VERBOSE) {
+                    System.out.println("\nTEST: " + Thread.currentThread().getName() + ": now add");
+                  }
+                  try {
+                    writerRef.get().addDocument(docs.nextDoc());
+                  } catch (AlreadyClosedException ace) {
+                    // ok
+                  } catch (NullPointerException npe) {
+                    // ok
+                  } catch (AssertionError ae) {
+                    // ok
+                  }
+                  break;
+                }
+              } catch (Throwable t) {
+                failed.set(true);
+                throw new RuntimeException(t);
+              }
+            }
+          }
+        };
+      threads[threadID].start();
+    }
+
+    for(int threadID=0;threadID<threadCount;threadID++) {
+      threads[threadID].join();
+    }
+
+    assertTrue(!failed.get());
+    writerRef.get().close();
+    d.close();
+  }
 }