You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/04/22 12:30:30 UTC
[lucene-solr] branch master updated: LUCENE-9337: Ensure CMS
updates it's thread accounting datastructures consistently (#1443)
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 2b6ae53 LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)
2b6ae53 is described below
commit 2b6ae53cd9f060758a6c92c7a61bfb91d878fac2
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Apr 22 14:30:14 2020 +0200
LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently (#1443)
CMS today releases it's lock after finishing a merge before it re-acquires it to update
the thread accounting datastructures. This causes threading issues where concurrently
finishing threads fail to pick up pending merges causing potential thread starvation
on forceMerge calls.
---
lucene/CHANGES.txt | 6 ++
.../lucene/index/ConcurrentMergeScheduler.java | 45 +++++------
.../java/org/apache/lucene/index/IndexWriter.java | 8 +-
.../lucene/index/TestConcurrentMergeScheduler.java | 86 ++++++++++++++++++++++
4 files changed, 120 insertions(+), 25 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 2055766..a9301eb 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -185,6 +185,12 @@ Bug Fixes
* LUCENE-9309: Wait for #addIndexes merges when aborting merges. (Simon Willnauer)
+* LUCENE-9337: Ensure CMS updates it's thread accounting datastructures consistently.
+ CMS today releases it's lock after finishing a merge before it re-acquires it to update
+ the thread accounting datastructures. This causes threading issues where concurrently
+ finishing threads fail to pick up pending merges causing potential thread starvation on
+ forceMerge calls. (Simon Willnauer)
+
Other
---------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index f20ae31..0324cd3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@@ -633,6 +634,27 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
return thread;
}
+ synchronized void runOnMergeFinished(IndexWriter writer) {
+ // the merge call as well as the merge thread handling in the finally
+ // block must be sync'd on CMS otherwise stalling decisions might cause
+ // us to miss pending merges
+ assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
+ // Let CMS run new merges if necessary:
+ try {
+ merge(writer, MergeTrigger.MERGE_FINISHED, true);
+ } catch (AlreadyClosedException ace) {
+ // OK
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ } finally {
+ removeMergeThread();
+ updateMergeThreads();
+ // In case we had stalled indexing, we can now wake up
+ // and possibly unstall:
+ notifyAll();
+ }
+ }
+
/** Runs a merge thread to execute a single merge, then exits. */
protected class MergeThread extends Thread implements Comparable<MergeThread> {
final IndexWriter writer;
@@ -664,18 +686,8 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
if (verbose()) {
message(" merge thread: done");
}
-
- // Let CMS run new merges if necessary:
- try {
- merge(writer, MergeTrigger.MERGE_FINISHED, true);
- } catch (AlreadyClosedException ace) {
- // OK
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
+ runOnMergeFinished(writer);
} catch (Throwable exc) {
-
if (exc instanceof MergePolicy.MergeAbortedException) {
// OK to ignore
} else if (suppressExceptions == false) {
@@ -683,17 +695,6 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
// testing.
handleMergeException(writer.getDirectory(), exc);
}
-
- } finally {
- synchronized(ConcurrentMergeScheduler.this) {
- removeMergeThread();
-
- updateMergeThreads();
-
- // In case we had stalled indexing, we can now wake up
- // and possibly unstall:
- ConcurrentMergeScheduler.this.notifyAll();
- }
}
}
}
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index ad002b8..d7cedda 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1990,7 +1990,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
if (doWait) {
synchronized(this) {
while(true) {
-
if (tragedy.get() != null) {
throw new IllegalStateException("this writer hit an unrecoverable error; cannot complete forceMerge", tragedy.get());
}
@@ -2007,10 +2006,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
}
}
- if (maxNumSegmentsMergesPending())
+ if (maxNumSegmentsMergesPending()) {
+ testPoint("forceMergeBeforeWait");
doWait();
- else
+ } else {
break;
+ }
}
}
@@ -4377,6 +4378,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
* but without holding synchronized lock on IndexWriter
* instance */
private int mergeMiddle(MergePolicy.OneMerge merge, MergePolicy mergePolicy) throws IOException {
+ testPoint("mergeMiddleStart");
merge.checkAborted();
Directory mergeDirectory = config.getMergeScheduler().wrapForMerge(merge, directory);
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
index f820fa9..f3a91ef 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
@@ -32,6 +33,7 @@ import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@@ -657,4 +659,88 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
assertFalse(failed.get());
}
+
+ /*
+ * This test tries to produce 2 merges running concurrently with 2 segments per merge. While these
+ * merges run we kick off a forceMerge that puts a pending merge in the queue but waits for things to happen.
+ * While we do this we reduce maxMergeCount to 1. If concurrency in CMS is not right the forceMerge will wait forever
+ * since none of the currently running merges picks up the pending merge. This test fails every time.
+ */
+ public void testChangeMaxMergeCountyWhileForceMerge() throws IOException, InterruptedException {
+ int numIters = TEST_NIGHTLY ? 100 : 10;
+ for (int iters = 0; iters < numIters; iters++) {
+ LogDocMergePolicy mp = new LogDocMergePolicy();
+ mp.setMergeFactor(2);
+ CountDownLatch forceMergeWaits = new CountDownLatch(1);
+ CountDownLatch mergeThreadsStartAfterWait = new CountDownLatch(1);
+ CountDownLatch mergeThreadsArrived = new CountDownLatch(2);
+ InfoStream stream = new InfoStream() {
+ @Override
+ public void message(String component, String message) {
+ if ("TP".equals(component) && "mergeMiddleStart".equals(message)) {
+ mergeThreadsArrived.countDown();
+ try {
+ mergeThreadsStartAfterWait.await();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ } else if ("TP".equals(component) && "forceMergeBeforeWait".equals(message)) {
+ forceMergeWaits.countDown();
+ }
+ }
+
+ @Override
+ public boolean isEnabled(String component) {
+ return "TP".equals(component);
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+ try (Directory dir = newDirectory();
+ IndexWriter writer = new IndexWriter(dir,
+ new IndexWriterConfig().setMergeScheduler(new ConcurrentMergeScheduler())
+ .setMergePolicy(mp).setInfoStream(stream)) {
+ @Override
+ protected boolean isEnableTestPoints() {
+ return true;
+ }
+ }) {
+ Thread t = new Thread(() -> {
+ try {
+ writer.forceMerge(1);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) writer.getConfig().getMergeScheduler();
+ cms.setMaxMergesAndThreads(2, 2);
+ try {
+ for (int i = 0; i < 4; i++) {
+ Document document = new Document();
+ document.add(new TextField("foo", "the quick brown fox jumps over the lazy dog", Field.Store.YES));
+ document.add(new TextField("bar", RandomStrings.randomRealisticUnicodeOfLength(random(), 20), Field.Store.YES));
+ writer.addDocument(document);
+ writer.flush();
+ }
+ assertEquals(writer.cloneSegmentInfos().toString(), 4, writer.getSegmentCount());
+ mergeThreadsArrived.await();
+ t.start();
+ forceMergeWaits.await();
+ cms.setMaxMergesAndThreads(1, 1);
+ } finally {
+ mergeThreadsStartAfterWait.countDown();
+ }
+
+ while (t.isAlive()) {
+ t.join(10);
+ if (cms.mergeThreadCount() == 0 && writer.hasPendingMerges()) {
+ fail("writer has pending merges but no CMS threads are running");
+ }
+ }
+ assertEquals(1, writer.getSegmentCount());
+ }
+ }
+ }
}