You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2019/04/05 08:52:46 UTC
[lucene-solr] branch branch_7_7 updated: LUCENE-8688: Forced merges
merge more than necessary.
This is an automated email from the ASF dual-hosted git repository.
jpountz pushed a commit to branch branch_7_7
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_7_7 by this push:
new 66188bd LUCENE-8688: Forced merges merge more than necessary.
66188bd is described below
commit 66188bd0293a78000fb356cb23bdcc5797f6fd73
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Fri Mar 15 10:27:27 2019 +0100
LUCENE-8688: Forced merges merge more than necessary.
---
lucene/CHANGES.txt | 4 +
.../org/apache/lucene/index/TieredMergePolicy.java | 57 +++++++++++--
.../apache/lucene/index/TestTieredMergePolicy.java | 98 ++++++++++++++++++++++
.../lucene/index/BaseMergePolicyTestCase.java | 8 +-
4 files changed, 161 insertions(+), 6 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index b5241be..8b15707 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -15,6 +15,10 @@ Bug fixes
where our best effort in carrying on generations in the IndexWriter since pending
deletions are swallowed by the FilterDirectory. (Henning Andersen, Simon Willnauer)
+* LUCENE-8688: TieredMergePolicy#findForcedMerges now tries to create the
+ cheapest merges that allow the index to go down to `maxSegmentCount` segments
+ or less. (Armin Braun via Adrien Grand)
+
======================= Lucene 7.7.1 =======================
(No changes)
diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
index e0729d0..73fb2ef 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
@@ -622,9 +622,8 @@ public class TieredMergePolicy extends MergePolicy {
};
}
-
@Override
- public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
+ public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo, Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
if (verbose(mergeContext)) {
message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + segString(mergeContext, infos) +
" segmentsToMerge=" + segmentsToMerge, mergeContext);
@@ -639,6 +638,7 @@ public class TieredMergePolicy extends MergePolicy {
// Trim the list down, remove if we're respecting max segment size and it's not original. Presumably it's been merged before and
// is close enough to the max segment size we shouldn't add it in again.
Iterator<SegmentSizeAndDocs> iter = sortedSizeAndDocs.iterator();
+ boolean forceMergeRunning = false;
while (iter.hasNext()) {
SegmentSizeAndDocs segSizeDocs = iter.next();
final Boolean isOriginal = segmentsToMerge.get(segSizeDocs.segInfo);
@@ -646,6 +646,7 @@ public class TieredMergePolicy extends MergePolicy {
iter.remove();
} else {
if (merging.contains(segSizeDocs.segInfo)) {
+ forceMergeRunning = true;
iter.remove();
} else {
totalMergeBytes += segSizeDocs.sizeInBytes;
@@ -707,6 +708,12 @@ public class TieredMergePolicy extends MergePolicy {
message("eligible=" + sortedSizeAndDocs, mergeContext);
}
+ final int startingSegmentCount = sortedSizeAndDocs.size();
+ final boolean finalMerge = startingSegmentCount < maxSegmentCount + maxMergeAtOnceExplicit - 1;
+ if (finalMerge && forceMergeRunning) {
+ return null;
+ }
+
// This is the special case of merging down to one segment
if (sortedSizeAndDocs.size() < maxMergeAtOnceExplicit && maxSegmentCount == 1 && totalMergeBytes < maxMergeBytes) {
MergeSpecification spec = new MergeSpecification();
@@ -718,10 +725,50 @@ public class TieredMergePolicy extends MergePolicy {
return spec;
}
- MergeSpecification spec = doFindMerges(sortedSizeAndDocs, maxMergeBytes, maxMergeAtOnceExplicit,
- maxSegmentCount, 0, MERGE_TYPE.FORCE_MERGE, mergeContext, false);
+ MergeSpecification spec = null;
- return spec;
+ int index = startingSegmentCount - 1;
+ int resultingSegments = startingSegmentCount;
+ while (true) {
+ List<SegmentCommitInfo> candidate = new ArrayList<>();
+ long currentCandidateBytes = 0L;
+ int mergesAllowed = maxMergeAtOnceExplicit;
+ while (index >= 0 && resultingSegments > maxSegmentCount && mergesAllowed > 0) {
+ final SegmentCommitInfo current = sortedSizeAndDocs.get(index).segInfo;
+ final int initialCandidateSize = candidate.size();
+ final long currentSegmentSize = current.sizeInBytes();
+ // We either add to the bin because there's space or because the it is the smallest possible bin since
+ // decrementing the index will move us to even larger segments.
+ if (currentCandidateBytes + currentSegmentSize <= maxMergeBytes || initialCandidateSize < 2) {
+ candidate.add(current);
+ --index;
+ currentCandidateBytes += currentSegmentSize;
+ --mergesAllowed;
+ if (initialCandidateSize > 0) {
+ // Any merge that handles two or more segments reduces the resulting number of segments
+ // by the number of segments handled - 1
+ --resultingSegments;
+ }
+ } else {
+ break;
+ }
+ }
+ final int candidateSize = candidate.size();
+ // While a force merge is running, only merges that cover the maximum allowed number of segments or that create a segment close to the
+ // maximum allowed segment sized are permitted
+ if (candidateSize > 1 && (forceMergeRunning == false || candidateSize == maxMergeAtOnceExplicit || candidateSize > 0.7 * maxMergeBytes)) {
+ final OneMerge merge = new OneMerge(candidate);
+ if (verbose(mergeContext)) {
+ message("add merge=" + segString(mergeContext, merge.segments), mergeContext);
+ }
+ if (spec == null) {
+ spec = new MergeSpecification();
+ }
+ spec.add(merge);
+ } else {
+ return spec;
+ }
+ }
}
@Override
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
index bd11a58..a69174f 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
@@ -19,8 +19,11 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
@@ -360,6 +363,101 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
dir.close();
}
+ // LUCENE-8688 reports that force merges merged more segments that necessary to respect maxSegmentCount as a result
+ // of LUCENE-7976 so we ensure that it only does the minimum number of merges here.
+ public void testForcedMergesUseLeastNumberOfMerges() throws Exception {
+ final TieredMergePolicy tmp = new TieredMergePolicy();
+ final double oneSegmentSize = 1.0D;
+ final double maxSegmentSize = 10 * oneSegmentSize;
+ tmp.setMaxMergedSegmentMB(maxSegmentSize);
+
+ SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
+ for (int j = 0; j < 30; ++j) {
+ infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, oneSegmentSize, IndexWriter.SOURCE_MERGE));
+ }
+
+ final int expectedCount = random().nextInt(10) + 3;
+ final MergeSpecification specification =
+ tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), new MockMergeContext(SegmentCommitInfo::getDelCount));
+ assertMaxSize(specification, maxSegmentSize);
+ final int resultingCount =
+ infos.size() + specification.merges.size() - specification.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+ assertEquals(expectedCount, resultingCount);
+
+ SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
+ final int manySegmentsCount = atLeast(100);
+ for (int j = 0; j < manySegmentsCount; ++j) {
+ manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
+ }
+
+ final MergeSpecification specificationManySegments = tmp.findForcedMerges(
+ manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), new MockMergeContext(SegmentCommitInfo::getDelCount));
+ assertMaxSize(specificationManySegments, maxSegmentSize);
+ final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
+ - specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+ assertTrue(resultingCountManySegments >= expectedCount);
+ }
+
+ // Make sure that TieredMergePolicy doesn't do the final merge while there are merges ongoing, but does do non-final
+ // merges while merges are ongoing.
+ public void testForcedMergeWithPending() throws Exception {
+ final TieredMergePolicy tmp = new TieredMergePolicy();
+ final double maxSegmentSize = 10.0D;
+ tmp.setMaxMergedSegmentMB(maxSegmentSize);
+
+ SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
+ for (int j = 0; j < 30; ++j) {
+ infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 1.0D, IndexWriter.SOURCE_MERGE));
+ }
+ final MockMergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
+ mergeContext.setMergingSegments(Collections.singleton(infos.asList().get(0)));
+ final int expectedCount = random().nextInt(10) + 3;
+ final MergeSpecification specification = tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), mergeContext);
+ // Since we have fewer than 30 (the max merge count) segments more than the final size this would have been the final merge
+ // so we check that it was prevented.
+ assertNull(specification);
+
+ SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
+ final int manySegmentsCount = atLeast(500);
+ for (int j = 0; j < manySegmentsCount; ++j) {
+ manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
+ }
+
+ // We set one merge to be ongoing. Since we have more than 30 (the max merge count) times the number of segments
+ // of that we want to merge to this is not the final merge and hence the returned specification must not be null.
+ mergeContext.setMergingSegments(Collections.singleton(manySegmentsInfos.asList().get(0)));
+ final MergeSpecification specificationManySegments =
+ tmp.findForcedMerges(manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), mergeContext);
+ assertMaxSize(specificationManySegments, maxSegmentSize);
+ for (OneMerge merge : specificationManySegments.merges) {
+ assertEquals("No merges of less than the max merge count are permitted while another merge is in progress",
+ merge.segments.size(), tmp.getMaxMergeAtOnceExplicit());
+ }
+ final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
+ - specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+ assertTrue(resultingCountManySegments >= expectedCount);
+ }
+
+ private static Map<SegmentCommitInfo, Boolean> segmentsToMerge(SegmentInfos infos) {
+ final Map<SegmentCommitInfo, Boolean> segmentsToMerge = new HashMap<>();
+ for (SegmentCommitInfo info : infos) {
+ segmentsToMerge.put(info, Boolean.TRUE);
+ }
+ return segmentsToMerge;
+ }
+
+ private static void assertMaxSize(MergeSpecification specification, double maxSegmentSizeMb) {
+ for (OneMerge merge : specification.merges) {
+ assertTrue(merge.segments.stream().mapToLong(s -> {
+ try {
+ return s.sizeInBytes();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }).sum() < 1024 * 1024 * maxSegmentSizeMb * 1.5);
+ }
+ }
+
// Having a segment with very few documents in it can happen because of the random nature of the
// docs added to the index. For instance, let's say it just happens that the last segment has 3 docs in it.
// It can easily be merged with a close-to-max sized segment during a forceMerge and still respect the max segment
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
index c922f02..b77c68d 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
@@ -162,6 +162,8 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
}
};
+ private Set<SegmentCommitInfo> mergingSegments = Collections.emptySet();
+
public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
this.numDeletesFunc = numDeletesFunc;
}
@@ -183,7 +185,11 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
@Override
public Set<SegmentCommitInfo> getMergingSegments() {
- return Collections.emptySet();
+ return mergingSegments;
+ }
+
+ public void setMergingSegments(Set<SegmentCommitInfo> mergingSegments) {
+ this.mergingSegments = mergingSegments;
}
}