You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2019/03/15 09:30:11 UTC

[lucene-solr] branch master updated: LUCENE-8688: Forced merges merge more than necessary.

This is an automated email from the ASF dual-hosted git repository.

jpountz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 425f207  LUCENE-8688: Forced merges merge more than necessary.
425f207 is described below

commit 425f207f4098e4fcc1307729b1f2e17cfef3929f
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Fri Mar 15 10:27:27 2019 +0100

    LUCENE-8688: Forced merges merge more than necessary.
---
 lucene/CHANGES.txt                                 |  4 +
 .../org/apache/lucene/index/TieredMergePolicy.java | 57 +++++++++++--
 .../apache/lucene/index/TestTieredMergePolicy.java | 98 ++++++++++++++++++++++
 .../lucene/index/BaseMergePolicyTestCase.java      |  8 +-
 4 files changed, 161 insertions(+), 6 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 203dbc8..79ceced 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -44,6 +44,10 @@ Bug fixes
 * LUCENE-8719: FixedShingleFilter can miss shingles at the end of a token stream if
   there are multiple paths with different lengths. (Alan Woodward)
 
+* LUCENE-8688: TieredMergePolicy#findForcedMerges now tries to create the
+  cheapest merges that allow the index to go down to `maxSegmentCount` segments
+  or less. (Armin Braun via Adrien Grand)
+
 Improvements
 
 * LUCENE-8673: Use radix partitioning when merging dimensional points instead
diff --git a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
index e0729d0..73fb2ef 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java
@@ -622,9 +622,8 @@ public class TieredMergePolicy extends MergePolicy {
     };
   }
 
-
   @Override
-  public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
+  public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentCommitInfo, Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
     if (verbose(mergeContext)) {
       message("findForcedMerges maxSegmentCount=" + maxSegmentCount + " infos=" + segString(mergeContext, infos) +
           " segmentsToMerge=" + segmentsToMerge, mergeContext);
@@ -639,6 +638,7 @@ public class TieredMergePolicy extends MergePolicy {
     // Trim the list down, remove if we're respecting max segment size and it's not original. Presumably it's been merged before and
     //   is close enough to the max segment size we shouldn't add it in again.
     Iterator<SegmentSizeAndDocs> iter = sortedSizeAndDocs.iterator();
+    boolean forceMergeRunning = false;
     while (iter.hasNext()) {
       SegmentSizeAndDocs segSizeDocs = iter.next();
       final Boolean isOriginal = segmentsToMerge.get(segSizeDocs.segInfo);
@@ -646,6 +646,7 @@ public class TieredMergePolicy extends MergePolicy {
         iter.remove();
       } else {
         if (merging.contains(segSizeDocs.segInfo)) {
+          forceMergeRunning = true;
           iter.remove();
         } else {
           totalMergeBytes += segSizeDocs.sizeInBytes;
@@ -707,6 +708,12 @@ public class TieredMergePolicy extends MergePolicy {
       message("eligible=" + sortedSizeAndDocs, mergeContext);
     }
 
+    final int startingSegmentCount = sortedSizeAndDocs.size();
+    final boolean finalMerge = startingSegmentCount < maxSegmentCount + maxMergeAtOnceExplicit - 1;
+    if (finalMerge && forceMergeRunning) {
+      return null;
+    }
+
     // This is the special case of merging down to one segment
     if (sortedSizeAndDocs.size() < maxMergeAtOnceExplicit && maxSegmentCount == 1 && totalMergeBytes < maxMergeBytes) {
       MergeSpecification spec = new MergeSpecification();
@@ -718,10 +725,50 @@ public class TieredMergePolicy extends MergePolicy {
       return spec;
     }
 
-    MergeSpecification spec = doFindMerges(sortedSizeAndDocs, maxMergeBytes, maxMergeAtOnceExplicit,
-        maxSegmentCount, 0, MERGE_TYPE.FORCE_MERGE, mergeContext, false);
+    MergeSpecification spec = null;
 
-    return spec;
+    int index = startingSegmentCount - 1;
+    int resultingSegments = startingSegmentCount;
+    while (true) {
+      List<SegmentCommitInfo> candidate = new ArrayList<>();
+      long currentCandidateBytes = 0L;
+      int mergesAllowed = maxMergeAtOnceExplicit;
+      while (index >= 0 && resultingSegments > maxSegmentCount && mergesAllowed > 0) {
+        final SegmentCommitInfo current = sortedSizeAndDocs.get(index).segInfo;
+        final int initialCandidateSize = candidate.size();
+        final long currentSegmentSize = current.sizeInBytes();
+        // We either add to the bin because there's space or because the it is the smallest possible bin since
+        // decrementing the index will move us to even larger segments.
+        if (currentCandidateBytes + currentSegmentSize <= maxMergeBytes || initialCandidateSize < 2) {
+          candidate.add(current);
+          --index;
+          currentCandidateBytes += currentSegmentSize;
+          --mergesAllowed;
+          if (initialCandidateSize > 0) {
+            // Any merge that handles two or more segments reduces the resulting number of segments
+            // by the number of segments handled - 1
+            --resultingSegments;
+          }
+        } else {
+          break;
+        }
+      }
+      final int candidateSize = candidate.size();
+      // While a force merge is running, only merges that cover the maximum allowed number of segments or that create a segment close to the
+      // maximum allowed segment sized are permitted
+      if (candidateSize > 1 && (forceMergeRunning == false || candidateSize == maxMergeAtOnceExplicit || candidateSize > 0.7 * maxMergeBytes)) {
+        final OneMerge merge = new OneMerge(candidate);
+        if (verbose(mergeContext)) {
+          message("add merge=" + segString(mergeContext, merge.segments), mergeContext);
+        }
+        if (spec == null) {
+          spec = new MergeSpecification();
+        }
+        spec.add(merge);
+      } else {
+        return spec;
+      }
+    }
   }
 
   @Override
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
index bd11a58..a69174f 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestTieredMergePolicy.java
@@ -19,8 +19,11 @@ package org.apache.lucene.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
@@ -360,6 +363,101 @@ public class TestTieredMergePolicy extends BaseMergePolicyTestCase {
     dir.close();
   }
 
+  // LUCENE-8688 reports that force merges merged more segments that necessary to respect maxSegmentCount as a result
+  // of LUCENE-7976 so we ensure that it only does the minimum number of merges here.
+  public void testForcedMergesUseLeastNumberOfMerges() throws Exception {
+    final TieredMergePolicy tmp = new TieredMergePolicy();
+    final double oneSegmentSize = 1.0D;
+    final double maxSegmentSize = 10 * oneSegmentSize;
+    tmp.setMaxMergedSegmentMB(maxSegmentSize);
+
+    SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
+    for (int j = 0; j < 30; ++j) {
+      infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, oneSegmentSize, IndexWriter.SOURCE_MERGE));
+    }
+
+    final int expectedCount = random().nextInt(10) + 3;
+    final MergeSpecification specification =
+        tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), new MockMergeContext(SegmentCommitInfo::getDelCount));
+    assertMaxSize(specification, maxSegmentSize);
+    final int resultingCount =
+        infos.size() + specification.merges.size() - specification.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+    assertEquals(expectedCount, resultingCount);
+
+    SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
+    final int manySegmentsCount = atLeast(100);
+    for (int j = 0; j < manySegmentsCount; ++j) {
+      manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
+    }
+
+    final MergeSpecification specificationManySegments = tmp.findForcedMerges(
+        manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), new MockMergeContext(SegmentCommitInfo::getDelCount));
+    assertMaxSize(specificationManySegments, maxSegmentSize);
+    final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
+        - specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+    assertTrue(resultingCountManySegments >= expectedCount);
+  }
+
+  // Make sure that TieredMergePolicy doesn't do the final merge while there are merges ongoing, but does do non-final
+  // merges while merges are ongoing.
+  public void testForcedMergeWithPending() throws Exception {
+    final TieredMergePolicy tmp = new TieredMergePolicy();
+    final double maxSegmentSize = 10.0D;
+    tmp.setMaxMergedSegmentMB(maxSegmentSize);
+
+    SegmentInfos infos = new SegmentInfos(Version.LATEST.major);
+    for (int j = 0; j < 30; ++j) {
+      infos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 1.0D, IndexWriter.SOURCE_MERGE));
+    }
+    final MockMergeContext mergeContext = new MockMergeContext(SegmentCommitInfo::getDelCount);
+    mergeContext.setMergingSegments(Collections.singleton(infos.asList().get(0)));
+    final int expectedCount = random().nextInt(10) + 3;
+    final MergeSpecification specification = tmp.findForcedMerges(infos, expectedCount, segmentsToMerge(infos), mergeContext);
+    // Since we have fewer than 30 (the max merge count) segments more than the final size this would have been the final merge
+    // so we check that it was prevented.
+    assertNull(specification);
+
+    SegmentInfos manySegmentsInfos = new SegmentInfos(Version.LATEST.major);
+    final int manySegmentsCount = atLeast(500);
+    for (int j = 0; j < manySegmentsCount; ++j) {
+      manySegmentsInfos.add(makeSegmentCommitInfo("_" + j, 1000, 0, 0.1D, IndexWriter.SOURCE_MERGE));
+    }
+
+    // We set one merge to be ongoing. Since we have more than 30 (the max merge count) times the number of segments
+    // of that we want to merge to this is not the final merge and hence the returned specification must not be null.
+    mergeContext.setMergingSegments(Collections.singleton(manySegmentsInfos.asList().get(0)));
+    final MergeSpecification specificationManySegments =
+        tmp.findForcedMerges(manySegmentsInfos, expectedCount, segmentsToMerge(manySegmentsInfos), mergeContext);
+    assertMaxSize(specificationManySegments, maxSegmentSize);
+    for (OneMerge merge : specificationManySegments.merges) {
+      assertEquals("No merges of less than the max merge count are permitted while another merge is in progress",
+          merge.segments.size(), tmp.getMaxMergeAtOnceExplicit());
+    }
+    final int resultingCountManySegments = manySegmentsInfos.size() + specificationManySegments.merges.size()
+        - specificationManySegments.merges.stream().mapToInt(spec -> spec.segments.size()).sum();
+    assertTrue(resultingCountManySegments >= expectedCount);
+  }
+
+  private static Map<SegmentCommitInfo, Boolean> segmentsToMerge(SegmentInfos infos) {
+    final Map<SegmentCommitInfo, Boolean> segmentsToMerge = new HashMap<>();
+    for (SegmentCommitInfo info : infos) {
+      segmentsToMerge.put(info, Boolean.TRUE);
+    }
+    return segmentsToMerge;
+  }
+
+  private static void assertMaxSize(MergeSpecification specification, double maxSegmentSizeMb) {
+    for (OneMerge merge : specification.merges) {
+      assertTrue(merge.segments.stream().mapToLong(s -> {
+        try {
+          return s.sizeInBytes();
+        } catch (IOException e) {
+          throw new AssertionError(e);
+        }
+      }).sum() < 1024 * 1024 * maxSegmentSizeMb * 1.5);
+    }
+  }
+
   // Having a segment with very few documents in it can happen because of the random nature of the
   // docs added to the index. For instance, let's say it just happens that the last segment has 3 docs in it.
   // It can easily be merged with a close-to-max sized segment during a forceMerge and still respect the max segment
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
index c922f02..b77c68d 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseMergePolicyTestCase.java
@@ -162,6 +162,8 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
       }
     };
 
+    private Set<SegmentCommitInfo> mergingSegments = Collections.emptySet();
+
     public MockMergeContext(ToIntFunction<SegmentCommitInfo> numDeletesFunc) {
       this.numDeletesFunc = numDeletesFunc;
     }
@@ -183,7 +185,11 @@ public abstract class BaseMergePolicyTestCase extends LuceneTestCase {
 
     @Override
     public Set<SegmentCommitInfo> getMergingSegments() {
-      return Collections.emptySet();
+      return mergingSegments;
+    }
+
+    public void setMergingSegments(Set<SegmentCommitInfo> mergingSegments) {
+      this.mergingSegments = mergingSegments;
     }
   }