You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/17 20:53:38 UTC
[lucene-solr] branch branch_8x updated: LUCENE-8962: Allow waiting
for all merges in a merge spec (#1585)
This is an automated email from the ASF dual-hosted git repository.
simonw pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new d65dcb4 LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)
d65dcb4 is described below
commit d65dcb43728dd6bb64393226e24576525328cecc
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 17 22:48:12 2020 +0200
LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)
This change adds infrastructure to allow straight forward waiting
on one or more merges or an entire merge specification. This is
a basis for LUCENE-8962.
---
.../java/org/apache/lucene/index/IndexWriter.java | 20 ++-
.../java/org/apache/lucene/index/MergePolicy.java | 73 ++++++++--
.../lucene/index/TestDemoParallelLeafReader.java | 2 +-
.../org/apache/lucene/index/TestIndexWriter.java | 2 +-
.../org/apache/lucene/index/TestMergePolicy.java | 158 +++++++++++++++++++++
5 files changed, 232 insertions(+), 23 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index c392564..f52426f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2146,12 +2146,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
- if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
+ if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
mergeScheduler.merge(mergeSource, trigger);
}
}
- private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
+ private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {
// In case infoStream was disabled on init, but then enabled at some
@@ -2161,22 +2161,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
assert trigger != null;
if (stopMerges) {
- return false;
+ return null;
}
// Do not start new merges if disaster struck
if (tragedy.get() != null) {
- return false;
+ return null;
}
- boolean newMergesFound = false;
+
final MergePolicy.MergeSpecification spec;
if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
"Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
- newMergesFound = spec != null;
- if (newMergesFound) {
+ if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
@@ -2186,14 +2185,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}
- newMergesFound = spec != null;
- if (newMergesFound) {
+ if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
registerMerge(spec.merges.get(i));
}
}
- return newMergesFound;
+ return spec;
}
/** Expert: to be used by a {@link MergePolicy} to avoid
@@ -4306,7 +4304,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final boolean drop = suppressExceptions == false;
- try (Closeable finalizer = merge::mergeFinished) {
+ try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
IOUtils.applyToAll(merge.readers, sr -> {
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index ec51a68..a94e595 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -23,7 +23,12 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +42,7 @@ import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
/**
* <p>Expert: a MergePolicy determines the sequence of
@@ -76,7 +82,7 @@ public abstract class MergePolicy {
* @lucene.experimental */
public static class OneMergeProgress {
/** Reason for pausing the merge thread. */
- public static enum PauseReason {
+ public enum PauseReason {
/** Stopped (because of throughput rate set to 0, typically). */
STOPPED,
/** Temporarily paused because of exceeded throughput rate. */
@@ -196,6 +202,7 @@ public abstract class MergePolicy {
*
* @lucene.experimental */
public static class OneMerge {
+ private final CompletableFuture<Boolean> mergeCompleted = new CompletableFuture<>();
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
@@ -222,7 +229,7 @@ public abstract class MergePolicy {
volatile long mergeStartNS = -1;
/** Total number of documents in segments to be merged, not accounting for deletions. */
- public final int totalMaxDoc;
+ final int totalMaxDoc;
Throwable error;
/** Sole constructor.
@@ -234,12 +241,7 @@ public abstract class MergePolicy {
}
// clone the list, as the in list may be based off original SegmentInfos and may be modified
this.segments = new ArrayList<>(segments);
- int count = 0;
- for(SegmentCommitInfo info : segments) {
- count += info.info.maxDoc();
- }
- totalMaxDoc = count;
-
+ totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
}
@@ -251,8 +253,12 @@ public abstract class MergePolicy {
mergeProgress.setMergeThread(Thread.currentThread());
}
- /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
- public void mergeFinished() throws IOException {
+ /** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
+ * @param success true iff the merge finished successfully ie. was committed */
+ public void mergeFinished(boolean success) throws IOException {
+ if (mergeCompleted.complete(success) == false) {
+ throw new IllegalStateException("merge has already finished");
+ }
}
/** Wrap the reader in order to add/remove information to the merged segment. */
@@ -362,6 +368,37 @@ public abstract class MergePolicy {
public OneMergeProgress getMergeProgress() {
return mergeProgress;
}
+
+ /**
+ * Waits for this merge to be completed
+ * @return true if the merge finished within the specified timeout
+ */
+ boolean await(long timeout, TimeUnit timeUnit) {
+ try {
+ mergeCompleted.get(timeout, timeUnit);
+ return true;
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ } catch (ExecutionException | TimeoutException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Returns true if the merge has finished or false if it's still running or
+ * has not been started. This method will not block.
+ */
+ boolean isDone() {
+ return mergeCompleted.isDone();
+ }
+
+ /**
+ * Returns true iff the merge completed successfully or false if the merge succeeded with a failure.
+ * This method will not block and return an empty Optional if the merge has not finished yet
+ */
+ Optional<Boolean> hasCompletedSuccessfully() {
+ return Optional.ofNullable(mergeCompleted.getNow(null));
+ }
}
/**
@@ -399,6 +436,22 @@ public abstract class MergePolicy {
}
return b.toString();
}
+
+ /**
+ * Waits if necessary for at most the given time for all merges.
+ */
+ boolean await(long timeout, TimeUnit unit) {
+ try {
+ CompletableFuture<Void> future = CompletableFuture.allOf(merges.stream()
+ .map(m -> m.mergeCompleted).collect(Collectors.toList()).toArray(new CompletableFuture<?>[merges.size()]));
+ future.get(timeout, unit);
+ return true;
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ } catch (ExecutionException | TimeoutException e) {
+ return false;
+ }
+ }
}
/** Exception thrown if there are any problems while executing a merge. */
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
index 4b82800..7fdad3b 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
@@ -538,7 +538,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
}
@Override
- public void mergeFinished() throws IOException {
+ public void mergeFinished(boolean success) throws IOException {
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index 6911c86..c4331c7 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4182,7 +4182,7 @@ public class TestIndexWriter extends LuceneTestCase {
SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
return new MergePolicy.OneMerge(merge.segments) {
@Override
- public void mergeFinished() {
+ public void mergeFinished(boolean success) {
onlyFinishOnce.set(true);
}
};
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
new file mode 100644
index 0000000..c252da0
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.StringHelper;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.Version;
+
+public class TestMergePolicy extends LuceneTestCase {
+
+ public void testWaitForOneMerge() throws IOException, InterruptedException {
+ try (Directory dir = newDirectory()) {
+ MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 1 + random().nextInt(10));
+ for (MergePolicy.OneMerge m : ms.merges) {
+ assertFalse(m.hasCompletedSuccessfully().isPresent());
+ }
+ Thread t = new Thread(() -> {
+ try {
+ for (MergePolicy.OneMerge m : ms.merges) {
+ m.mergeFinished(true);
+ }
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ t.start();
+ assertTrue(ms.await(100, TimeUnit.HOURS));
+ for (MergePolicy.OneMerge m : ms.merges) {
+ assertTrue(m.hasCompletedSuccessfully().get());
+ }
+ t.join();
+ }
+ }
+
+ public void testTimeout() throws IOException, InterruptedException {
+ try (Directory dir = newDirectory()) {
+ MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 3);
+ for (MergePolicy.OneMerge m : ms.merges) {
+ assertFalse(m.hasCompletedSuccessfully().isPresent());
+ }
+ Thread t = new Thread(() -> {
+ try {
+ ms.merges.get(0).mergeFinished(true);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ t.start();
+ assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
+ assertFalse(ms.merges.get(1).hasCompletedSuccessfully().isPresent());
+ t.join();
+ }
+ }
+
+ public void testTimeoutLargeNumberOfMerges() throws IOException, InterruptedException {
+ try (Directory dir = newDirectory()) {
+ MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 10000);
+ for (MergePolicy.OneMerge m : ms.merges) {
+ assertFalse(m.hasCompletedSuccessfully().isPresent());
+ }
+ AtomicInteger i = new AtomicInteger(0);
+ AtomicBoolean stop = new AtomicBoolean(false);
+ Thread t = new Thread(() -> {
+ while (stop.get() == false) {
+ try {
+ ms.merges.get(i.getAndIncrement()).mergeFinished(true);
+ Thread.sleep(1);
+ } catch (IOException | InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ }
+ });
+ t.start();
+ assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
+ stop.set(true);
+ t.join();
+ for (int j = 0; j < ms.merges.size(); j++) {
+ if (j < i.get()) {
+ assertTrue(ms.merges.get(j).hasCompletedSuccessfully().get());
+ } else {
+ assertFalse(ms.merges.get(j).hasCompletedSuccessfully().isPresent());
+ }
+ }
+ }
+ }
+
+ public void testFinishTwice() throws IOException {
+ try (Directory dir = newDirectory()) {
+ MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
+ MergePolicy.OneMerge oneMerge = spec.merges.get(0);
+ oneMerge.mergeFinished(true);
+ expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
+ }
+ }
+
+ public void testTotalMaxDoc() throws IOException {
+ try (Directory dir = newDirectory()) {
+ MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
+ int docs = 0;
+ MergePolicy.OneMerge oneMerge = spec.merges.get(0);
+ for (SegmentCommitInfo info : oneMerge.segments) {
+ docs += info.info.maxDoc();
+ }
+ assertEquals(docs, oneMerge.totalMaxDoc);
+ }
+ }
+
+ private static MergePolicy.MergeSpecification createRandomMergeSpecification(Directory dir, int numMerges) {
+ MergePolicy.MergeSpecification ms = new MergePolicy.MergeSpecification();
+ for (int ii = 0; ii < numMerges; ++ii) {
+ final SegmentInfo si = new SegmentInfo(
+ dir, // dir
+ Version.LATEST, // version
+ Version.LATEST, // min version
+ TestUtil.randomSimpleString(random()), // name
+ random().nextInt(1000), // maxDoc
+ random().nextBoolean(), // isCompoundFile
+ null, // codec
+ Collections.emptyMap(), // diagnostics
+ TestUtil.randomSimpleString(// id
+ random(),
+ StringHelper.ID_LENGTH,
+ StringHelper.ID_LENGTH).getBytes(StandardCharsets.US_ASCII),
+ Collections.emptyMap(), // attributes
+ null /* indexSort */);
+ final List<SegmentCommitInfo> segments = new LinkedList<SegmentCommitInfo>();
+ segments.add(new SegmentCommitInfo(si, 0, 0, 0, 0, 0, StringHelper.randomId()));
+ ms.add(new MergePolicy.OneMerge(segments));
+ }
+ return ms;
+ }
+}