You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/04/22 12:36:04 UTC

[lucene-solr] branch branch_8x updated: LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 13ae8ae  LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)
13ae8ae is described below

commit 13ae8ae42d4d8c1323c0deaff156355fb36bbc50
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Apr 22 14:30:14 2020 +0200

    LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)
    
    CMS today releases it's lock after finishing a merge before it re-acquires it to update
    the thread accounting datastructures. This causes threading issues where concurrently
    finishing threads fail to pick up pending merges causing potential thread starvation
    on forceMerge calls.
---
 lucene/CHANGES.txt                                 |  6 ++
 .../lucene/index/ConcurrentMergeScheduler.java     | 45 +++++------
 .../java/org/apache/lucene/index/IndexWriter.java  |  8 +-
 .../lucene/index/TestConcurrentMergeScheduler.java | 86 ++++++++++++++++++++++
 4 files changed, 120 insertions(+), 25 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 2c7c9f5c..1fb2e69 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -73,6 +73,12 @@ Bug Fixes
 
 * LUCENE-9309: Wait for #addIndexes merges when aborting merges. (Simon Willnauer)
 
+* LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently.
+  CMS today releases it's lock after finishing a merge before it re-acquires it to update
+  the thread accounting datastructures. This causes threading issues where concurrently
+  finishing threads fail to pick up pending merges causing potential thread starvation on
+  forceMerge calls. (Simon Willnauer)
+
 Other
 ---------------------
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index f20ae31..0324cd3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
 
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -633,6 +634,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
     return thread;
   }
 
+  synchronized void runOnMergeFinished(IndexWriter writer) {
+    // the merge call as well as the merge thread handling in the finally
+    // block must be sync'd on CMS otherwise stalling decisions might cause
+    // us to miss pending merges
+    assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
+    // Let CMS run new merges if necessary:
+    try {
+      merge(writer, MergeTrigger.MERGE_FINISHED, true);
+    } catch (AlreadyClosedException ace) {
+      // OK
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    } finally {
+      removeMergeThread();
+      updateMergeThreads();
+      // In case we had stalled indexing, we can now wake up
+      // and possibly unstall:
+      notifyAll();
+    }
+  }
+
   /** Runs a merge thread to execute a single merge, then exits. */
   protected class MergeThread extends Thread implements Comparable<MergeThread> {
     final IndexWriter writer;
@@ -664,18 +686,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
         if (verbose()) {
           message("  merge thread: done");
         }
-
-        // Let CMS run new merges if necessary:
-        try {
-          merge(writer, MergeTrigger.MERGE_FINISHED, true);
-        } catch (AlreadyClosedException ace) {
-          // OK
-        } catch (IOException ioe) {
-          throw new RuntimeException(ioe);
-        }
-
+        runOnMergeFinished(writer);
       } catch (Throwable exc) {
-
         if (exc instanceof MergePolicy.MergeAbortedException) {
           // OK to ignore
         } else if (suppressExceptions == false) {
@@ -683,17 +695,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
           // testing.
           handleMergeException(writer.getDirectory(), exc);
         }
-
-      } finally {
-        synchronized(ConcurrentMergeScheduler.this) {
-          removeMergeThread();
-
-          updateMergeThreads();
-
-          // In case we had stalled indexing, we can now wake up
-          // and possibly unstall:
-          ConcurrentMergeScheduler.this.notifyAll();
-        }
       }
     }
   }
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index b216fca..eef3e14 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2007,7 +2007,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     if (doWait) {
       synchronized(this) {
         while(true) {
-
           if (tragedy.get() != null) {
             throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy.get());
           }
@@ -2024,10 +2023,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
             }
           }
 
-          if (maxNumSegmentsMergesPending())
+          if (maxNumSegmentsMergesPending()) {
+            testPoint("forceMergeBeforeWait");
             doWait();
-          else
+          } else {
             break;
+          }
         }
       }
 
@@ -4394,6 +4395,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
    *  but without holding synchronized lock on IndexWriter
    *  instance */
   private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
+    testPoint("mergeMiddleStart");
     merge.checkAborted();
 
     Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
index f820fa9..f3a91ef 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
@@ -32,6 +33,7 @@ import org.apache.lucene.index.IndexWriterConfig.OpenMode;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.InfoStream;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 
@@ -657,4 +659,88 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
 
     assertFalse(failed.get());
   }
+
+  /*
+   * This test tries to produce 2 merges running concurrently with 2 segments per merge. While these
+   * merges run we kick off a forceMerge that puts a pending merge in the queue but waits for things to happen.
+   * While we do this we reduce maxMergeCount to 1. If concurrency in CMS is not right the forceMerge will wait forever
+   * since none of the currently running merges picks up the pending merge. This test fails every time.
+   */
+  public void testChangeMaxMergeCountyWhileForceMerge() throws IOException, InterruptedException {
+    int numIters = TEST_NIGHTLY ? 100 : 10;
+    for (int iters = 0; iters < numIters; iters++) {
+      LogDocMergePolicy mp = new LogDocMergePolicy();
+      mp.setMergeFactor(2);
+      CountDownLatch forceMergeWaits = new CountDownLatch(1);
+      CountDownLatch mergeThreadsStartAfterWait = new CountDownLatch(1);
+      CountDownLatch mergeThreadsArrived = new CountDownLatch(2);
+      InfoStream stream = new InfoStream() {
+        @Override
+        public void message(String component, String message) {
+          if ("TP".equals(component) && "mergeMiddleStart".equals(message)) {
+            mergeThreadsArrived.countDown();
+            try {
+              mergeThreadsStartAfterWait.await();
+            } catch (InterruptedException e) {
+              throw new AssertionError(e);
+            }
+          } else if ("TP".equals(component) && "forceMergeBeforeWait".equals(message)) {
+            forceMergeWaits.countDown();
+          }
+        }
+
+        @Override
+        public boolean isEnabled(String component) {
+          return "TP".equals(component);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+      try (Directory dir = newDirectory();
+           IndexWriter writer = new IndexWriter(dir,
+               new IndexWriterConfig().setMergeScheduler(new ConcurrentMergeScheduler())
+                   .setMergePolicy(mp).setInfoStream(stream)) {
+             @Override
+             protected boolean isEnableTestPoints() {
+               return true;
+             }
+           }) {
+        Thread t = new Thread(() -> {
+          try {
+            writer.forceMerge(1);
+          } catch (IOException e) {
+            throw new AssertionError(e);
+          }
+        });
+        ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler();
+        cms.setMaxMergesAndThreads(2, 2);
+        try {
+          for (int i = 0; i < 4; i++) {
+            Document document = new Document();
+            document.add(new TextField("foo", "the quick brown fox jumps over the lazy dog", Field.Store.YES));
+            document.add(new TextField("bar", RandomStrings.randomRealisticUnicodeOfLength(random(), 20), Field.Store.YES));
+            writer.addDocument(document);
+            writer.flush();
+          }
+          assertEquals(writer.cloneSegmentInfos().toString(), 4, writer.getSegmentCount());
+          mergeThreadsArrived.await();
+          t.start();
+          forceMergeWaits.await();
+          cms.setMaxMergesAndThreads(1, 1);
+        } finally {
+          mergeThreadsStartAfterWait.countDown();
+        }
+
+        while (t.isAlive()) {
+          t.join(10);
+          if (cms.mergeThreadCount() == 0 && writer.hasPendingMerges()) {
+            fail("writer has pending merges but no CMS threads are running");
+          }
+        }
+        assertEquals(1, writer.getSegmentCount());
+      }
+    }
+  }
 }