You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/08/14 14:09:40 UTC

[lucene-solr] branch branch_8x updated: Ensure DWPTPool never release any new DWPT after it's closed (#1751)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 6748251  Ensure DWPTPool never release any new DWPT after it's closed (#1751)
6748251 is described below

commit 6748251c7f33e317d8ce979f08f32486d0bf8ef2
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Fri Aug 14 16:04:58 2020 +0200

    Ensure DWPTPool never release any new DWPT after it's closed (#1751)
    
    The DWPTPool should not release new DPWTs after it's closed. Yet, if the pool
    is in a state where it's preventing new writers from being created in order to swap
    the delete queue it might get closed and in that case we miss to throw an AlreadyClosedException
    and release a new writer which violates the condition that the pool is empty after it's closed
    and all remaining DWPTs have been aborted.
---
 .../lucene/index/DocumentsWriterPerThreadPool.java | 14 +++-
 .../index/TestDocumentsWriterPerThreadPool.java    | 96 ++++++++++++++++++++++
 .../lucene/index/TestIndexWriterWithThreads.java   |  1 -
 3 files changed, 107 insertions(+), 4 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 4510ad6..77a0a9e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -97,6 +97,10 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
         throw new ThreadInterruptedException(ie);
       }
     }
+    // we must check if we are closed since this might happen while we are waiting for the writer permit
+    // and if we miss that we might release a new DWPT even though the pool is closed. Yet, that wouldn't be the
+    // end of the world it's violating the contract that we don't release any new DWPT after this pool is closed
+    ensureOpen();
     DocumentsWriterPerThread dwpt = dwptFactory.get();
     dwpt.lock(); // lock so nobody else will get this DWPT
     dwpts.add(dwpt);
@@ -109,9 +113,7 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
   /** This method is used by DocumentsWriter/FlushControl to obtain a DWPT to do an indexing operation (add/updateDocument). */
   DocumentsWriterPerThread getAndLock() throws IOException {
     synchronized (this) {
-      if (closed) {
-        throw new AlreadyClosedException("DWPTPool is already closed");
-      }
+      ensureOpen();
       // Important that we are LIFO here! This way if number of concurrent indexing threads was once high,
       // but has now reduced, we only use a limited number of DWPTs. This also guarantees that if we have suddenly
       // a single thread indexing
@@ -128,6 +130,12 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT
     }
   }
 
+  private void ensureOpen() {
+    if (closed) {
+      throw new AlreadyClosedException("DWPTPool is already closed");
+    }
+  }
+
   void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
     synchronized (this) {
       assert dwpts.contains(state) : "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java
new file mode 100644
index 0000000..dc31750
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterPerThreadPool.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.Version;
+
+public class TestDocumentsWriterPerThreadPool extends LuceneTestCase {
+
+  public void testLockReleaseAndClose() throws IOException {
+    try (Directory directory = newDirectory()) {
+      DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool(() ->
+          new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory,
+              newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false));
+
+      DocumentsWriterPerThread first = pool.getAndLock();
+      assertEquals(1, pool.size());
+      DocumentsWriterPerThread second = pool.getAndLock();
+      assertEquals(2, pool.size());
+      pool.marksAsFreeAndUnlock(first);
+      assertEquals(2, pool.size());
+      DocumentsWriterPerThread third = pool.getAndLock();
+      assertSame(first, third);
+      assertEquals(2, pool.size());
+      pool.checkout(third);
+      assertEquals(1, pool.size());
+
+      pool.close();
+      assertEquals(1, pool.size());
+      pool.marksAsFreeAndUnlock(second);
+      assertEquals(1, pool.size());
+      for (DocumentsWriterPerThread lastPerThead : pool.filterAndLock(x -> true)) {
+        pool.checkout(lastPerThead);
+        lastPerThead.unlock();
+      }
+      assertEquals(0, pool.size());
+    }
+  }
+
+  public void testCloseWhileNewWritersLocked() throws IOException, InterruptedException {
+    try (Directory directory = newDirectory()) {
+      DocumentsWriterPerThreadPool pool = new DocumentsWriterPerThreadPool(() ->
+          new DocumentsWriterPerThread(Version.LATEST.major, "", directory, directory,
+              newIndexWriterConfig(), new DocumentsWriterDeleteQueue(null), null, new AtomicLong(), false));
+
+      DocumentsWriterPerThread first = pool.getAndLock();
+      pool.lockNewWriters();
+      CountDownLatch latch = new CountDownLatch(1);
+      Thread t = new Thread(() -> {
+        try {
+          latch.countDown();
+          pool.getAndLock();
+          fail();
+        } catch (AlreadyClosedException e) {
+          // fine
+        } catch (IOException e) {
+          throw new AssertionError(e);
+        }
+      });
+      t.start();
+      latch.await();
+      while (t.getState().equals(Thread.State.WAITING) == false) {
+        Thread.yield();
+      }
+      first.unlock();
+      pool.close();
+      pool.unlockNewWriters();
+      for (DocumentsWriterPerThread perThread : pool.filterAndLock(x -> true)) {
+        assertTrue(pool.checkout(perThread));
+        perThread.unlock();
+      }
+      assertEquals(0, pool.size());
+    }
+  }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
index 4ff6686..c4f379e 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterWithThreads.java
@@ -59,7 +59,6 @@ public class TestIndexWriterWithThreads extends LuceneTestCase {
     private final CyclicBarrier syncStart;
     boolean diskFull;
     Throwable error;
-    AlreadyClosedException ace;
     IndexWriter writer;
     boolean noErrors;
     volatile int addCount;