You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/02/25 10:49:48 UTC

svn commit: r1662195 - in /lucene/dev/branches/lucene_solr_4_10/lucene: ./ core/src/java/org/apache/lucene/index/ core/src/test/org/apache/lucene/index/ test-framework/src/java/org/apache/lucene/index/

Author: mikemccand
Date: Wed Feb 25 09:49:47 2015
New Revision: 1662195

URL: http://svn.apache.org/r1662195
Log:
LUCENE-6214: don't deadlock when tragedy strikes during getReader and another thread is committing

Added:
    lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java   (with props)
    lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java   (with props)
Modified:
    lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt
    lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java

Modified: lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/CHANGES.txt Wed Feb 25 09:49:47 2015
@@ -39,6 +39,11 @@ Bug fixes
   FileNotFoundException when writing doc values updates at the same
   time that a merge kicks off.  (Mike McCandless)
 
+* LUCENE-6214: Fixed IndexWriter deadlock when one thread is
+  committing while another opens a near-real-time reader and an
+  unrecoverable (tragic) exception is hit.  (Simon Willnauer, Mike
+  McCandless)
+
 API Changes
 
 * LUCENE-6212: Deprecate IndexWriter APIs that accept per-document Analyzer.

Modified: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java Wed Feb 25 09:49:47 2015
@@ -639,7 +639,8 @@ final class DocumentsWriter implements C
     return anythingFlushed;
   }
   
-  final void finishFullFlush(boolean success) {
+  final void finishFullFlush(IndexWriter indexWriter, boolean success) {
+    assert indexWriter.holdsFullFlushLock();
     try {
       if (infoStream.isEnabled("DW")) {
         infoStream.message("DW", Thread.currentThread().getName() + " finishFullFlush success=" + success);

Modified: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java?rev=1662195&r1=1662194&r2=1662195&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java Wed Feb 25 09:49:47 2015
@@ -404,8 +404,8 @@ public class IndexWriter implements Clos
      */
     boolean success2 = false;
     try {
+      boolean success = false;
       synchronized (fullFlushLock) {
-        boolean success = false;
         try {
           anySegmentFlushed = docWriter.flushAllThreads(this);
           if (!anySegmentFlushed) {
@@ -413,7 +413,6 @@ public class IndexWriter implements Clos
             // if we flushed anything.
             flushCount.incrementAndGet();
           }
-          success = true;
           // Prevent segmentInfos from changing while opening the
           // reader; in theory we could instead do similar retry logic,
           // just like we do when loading segments_N
@@ -424,22 +423,18 @@ public class IndexWriter implements Clos
               infoStream.message("IW", "return reader version=" + r.getVersion() + " reader=" + r);
             }
           }
-        } catch (OutOfMemoryError oom) {
-          tragicEvent(oom, "getReader");
-          // never reached but javac disagrees:
-          return null;
+          success = true;
         } finally {
-          if (!success) {
+          // Done: finish the full flush!
+          docWriter.finishFullFlush(this, success);
+          if (success) {
+            processEvents(false, true);
+            doAfterFlush();
+          } else {
             if (infoStream.isEnabled("IW")) {
               infoStream.message("IW", "hit exception during NRT reader");
             }
           }
-          if (tragedy == null) {
-            // Done: finish the full flush! (unless we hit OOM or something)
-            docWriter.finishFullFlush(success);
-            processEvents(false, true);
-            doAfterFlush();
-          }
         }
       }
       if (anySegmentFlushed) {
@@ -449,6 +444,10 @@ public class IndexWriter implements Clos
         infoStream.message("IW", "getReader took " + (System.currentTimeMillis() - tStart) + " msec");
       }
       success2 = true;
+    } catch (OutOfMemoryError tragedy) {
+      tragicEvent(tragedy, "getReader");
+      // never reached but javac disagrees:
+      return null;
     } finally {
       if (!success2) {
         IOUtils.closeWhileHandlingException(r);
@@ -630,6 +629,9 @@ public class IndexWriter implements Clos
      */
     public synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) {
 
+      // Make sure no new readers can be opened if another thread just closed us:
+      ensureOpen(false);
+
       assert info.info.dir == directory: "info.dir=" + info.info.dir + " vs " + directory;
 
       ReadersAndUpdates rld = readerMap.get(info);
@@ -2228,17 +2230,17 @@ public class IndexWriter implements Clos
     /* hold the full flush lock to prevent concurrency commits / NRT reopens to
      * get in our way and do unnecessary work. -- if we don't lock this here we might
      * get in trouble if */
-    synchronized (fullFlushLock) { 
-      /*
-       * We first abort and trash everything we have in-memory
-       * and keep the thread-states locked, the lockAndAbortAll operation
-       * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
-       * of logical happens-before relationship in the DW. So we do
-       * abort all in memory structures 
-       * We also drop global field numbering before during abort to make
-       * sure it's just like a fresh index.
-       */
-      try {
+    /*
+     * We first abort and trash everything we have in-memory
+     * and keep the thread-states locked, the lockAndAbortAll operation
+     * also guarantees "point in time semantics" ie. the checkpoint that we need in terms
+     * of logical happens-before relationship in the DW. So we do
+     * abort all in memory structures 
+     * We also drop global field numbering before during abort to make
+     * sure it's just like a fresh index.
+     */
+    try {
+      synchronized (fullFlushLock) { 
         docWriter.lockAndAbortAll(this);
         processEvents(false, true);
         synchronized (this) {
@@ -2263,6 +2265,7 @@ public class IndexWriter implements Clos
             globalFieldNumberMap.clear();
             success = true;
           } finally {
+            docWriter.unlockAllAfterAbortAll(this);
             if (!success) {
               if (infoStream.isEnabled("IW")) {
                 infoStream.message("IW", "hit exception during deleteAll");
@@ -2270,11 +2273,9 @@ public class IndexWriter implements Clos
             }
           }
         }
-      } catch (OutOfMemoryError oom) {
-        tragicEvent(oom, "deleteAll");
-      } finally {
-        docWriter.unlockAllAfterAbortAll(this);
       }
+    } catch (OutOfMemoryError oom) {
+      tragicEvent(oom, "deleteAll");
     }
   }
 
@@ -2997,7 +2998,7 @@ public class IndexWriter implements Clos
               }
             }
             // Done: finish the full flush!
-            docWriter.finishFullFlush(flushSuccess);
+            docWriter.finishFullFlush(this, flushSuccess);
             doAfterFlush();
           }
         }
@@ -3238,27 +3239,31 @@ public class IndexWriter implements Clos
         infoStream.message("IW", "  start flush: applyAllDeletes=" + applyAllDeletes);
         infoStream.message("IW", "  index before flush " + segString());
       }
-      final boolean anySegmentFlushed;
+      final boolean anyChanges;
       
       synchronized (fullFlushLock) {
-      boolean flushSuccess = false;
+        boolean flushSuccess = false;
         try {
-          anySegmentFlushed = docWriter.flushAllThreads(this);
+          anyChanges = docWriter.flushAllThreads(this);
+          if (!anyChanges) {
+            // flushCount is incremented in flushAllThreads
+            flushCount.incrementAndGet();
+          }
           flushSuccess = true;
         } finally {
-          docWriter.finishFullFlush(flushSuccess);
+          docWriter.finishFullFlush(this, flushSuccess);
           processEvents(false, true);
         }
       }
       synchronized(this) {
         maybeApplyDeletes(applyAllDeletes);
         doAfterFlush();
-        if (!anySegmentFlushed) {
+        if (!anyChanges) {
           // flushCount is incremented in flushAllThreads
           flushCount.incrementAndGet();
         }
         success = true;
-        return anySegmentFlushed;
+        return anyChanges;
       }
     } catch (OutOfMemoryError oom) {
       tragicEvent(oom, "doFlush");

Added: lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java?rev=1662195&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/core/src/test/org/apache/lucene/index/TestTragicIndexWriterDeadlock.java Wed Feb 25 09:49:47 2015
@@ -0,0 +1,104 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestTragicIndexWriterDeadlock extends LuceneTestCase {
+
+  public void testDeadlockExcNRTReaderCommit() throws Exception {
+    MockDirectoryWrapper dir = newMockDirectory();
+    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+    if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) {
+      iwc.setMergeScheduler(new SuppressingConcurrentMergeScheduler() {
+          @Override
+          protected boolean isOK(Throwable th) {
+            return true;
+          }
+        });
+    }
+    final IndexWriter w = new IndexWriter(dir, iwc);
+    final CountDownLatch startingGun = new CountDownLatch(1);
+    final AtomicBoolean done = new AtomicBoolean();
+    Thread commitThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            startingGun.await();
+            while (done.get() == false) {
+              w.addDocument(new Document());
+              w.commit();
+            }
+          } catch (Throwable t) {
+            done.set(true);
+            //System.out.println("commit exc:");
+            //t.printStackTrace(System.out);
+          }
+        }
+      };
+    commitThread.start();
+    final DirectoryReader r0 = DirectoryReader.open(w, true);
+    final Thread nrtThread = new Thread() {
+        @Override
+        public void run() {
+          DirectoryReader r = r0;
+          try {
+            try {
+              startingGun.await();
+              while (done.get() == false) {
+                DirectoryReader oldReader = r;                  
+                DirectoryReader r2 = DirectoryReader.openIfChanged(oldReader);
+                if (r2 != null) {
+                  r = r2;
+                  oldReader.decRef();       
+                }
+              }
+            } finally {
+              r.close();
+            }
+          } catch (Throwable t) {
+            done.set(true);
+            //System.out.println("nrt exc:");
+            //t.printStackTrace(System.out);
+          }
+        }
+      };
+    nrtThread.start();
+    final AtomicBoolean keepFailing = new AtomicBoolean(true);
+    dir.failOn(new MockDirectoryWrapper.Failure() {
+        @Override
+        public void eval(MockDirectoryWrapper dir) {
+          if (Thread.currentThread() == nrtThread && keepFailing.get() && random().nextDouble() < 0.1) {
+            throw new OutOfMemoryError("fake OOME");
+          }
+        }
+      });
+    startingGun.countDown();
+    commitThread.join();
+    nrtThread.join();
+    keepFailing.set(false);
+    w.close();
+    dir.close();
+  }
+}

Added: lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java?rev=1662195&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/lucene/test-framework/src/java/org/apache/lucene/index/SuppressingConcurrentMergeScheduler.java Wed Feb 25 09:49:47 2015
@@ -0,0 +1,38 @@
+package org.apache.lucene.index;
+
+import org.apache.lucene.store.Directory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** A {@link ConcurrentMergeScheduler} that ignores AlreadyClosedException. */
+public abstract class SuppressingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
+  @Override
+  protected void handleMergeException(Throwable exc) {
+    while (true) {
+      if (isOK(exc)) {
+        return;
+      }
+      exc = exc.getCause();
+      if (exc == null) {
+        super.handleMergeException(exc);
+      }
+    }
+  }
+
+  protected abstract boolean isOK(Throwable t);
+}