You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2020/06/17 20:48:28 UTC

[lucene-solr] branch master updated: LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)

This is an automated email from the ASF dual-hosted git repository.

simonw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 59efe22  LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)
59efe22 is described below

commit 59efe22ac29c95f9ba85b7214fcf5e30cc979222
Author: Simon Willnauer <si...@apache.org>
AuthorDate: Wed Jun 17 22:48:12 2020 +0200

    LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)
    
    This change adds infrastructure to allow straight forward waiting
    on one or more merges or an entire merge specification. This is
    a basis for LUCENE-8962.
---
 .../java/org/apache/lucene/index/IndexWriter.java  |  20 ++-
 .../java/org/apache/lucene/index/MergePolicy.java  |  75 ++++++++--
 .../lucene/index/TestDemoParallelLeafReader.java   |   2 +-
 .../org/apache/lucene/index/TestIndexWriter.java   |   2 +-
 .../org/apache/lucene/index/TestMergePolicy.java   | 158 +++++++++++++++++++++
 5 files changed, 233 insertions(+), 24 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index f827f27..eefe04a 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2129,12 +2129,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
 
   private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
     ensureOpen(false);
-    if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
+    if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
       mergeScheduler.merge(mergeSource, trigger);
     }
   }
 
-  private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
+  private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
     throws IOException {
 
     // In case infoStream was disabled on init, but then enabled at some
@@ -2144,22 +2144,21 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
     assert trigger != null;
     if (stopMerges) {
-      return false;
+      return null;
     }
 
     // Do not start new merges if disaster struck
     if (tragedy.get() != null) {
-      return false;
+      return null;
     }
-    boolean newMergesFound = false;
+
     final MergePolicy.MergeSpecification spec;
     if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
       assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
       "Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();
 
       spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
-      newMergesFound = spec != null;
-      if (newMergesFound) {
+      if (spec != null) {
         final int numMerges = spec.merges.size();
         for(int i=0;i<numMerges;i++) {
           final MergePolicy.OneMerge merge = spec.merges.get(i);
@@ -2169,14 +2168,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     } else {
       spec = mergePolicy.findMerges(trigger, segmentInfos, this);
     }
-    newMergesFound = spec != null;
-    if (newMergesFound) {
+    if (spec != null) {
       final int numMerges = spec.merges.size();
       for(int i=0;i<numMerges;i++) {
         registerMerge(spec.merges.get(i));
       }
     }
-    return newMergesFound;
+    return spec;
   }
 
   /** Expert: to be used by a {@link MergePolicy} to avoid
@@ -4289,7 +4287,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
   @SuppressWarnings("try")
   private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
     final boolean drop = suppressExceptions == false;
-    try (Closeable finalizer = merge::mergeFinished) {
+    try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
       IOUtils.applyToAll(merge.readers, sr -> {
         final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
         // We still hold a ref so it should not have been removed:
diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
index 3ac3914..ca324a2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
@@ -23,7 +23,12 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +42,7 @@ import org.apache.lucene.store.MergeInfo;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.IOSupplier;
 import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * <p>Expert: a MergePolicy determines the sequence of
@@ -76,7 +82,7 @@ public abstract class MergePolicy {
    * @lucene.experimental */
   public static class OneMergeProgress {
     /** Reason for pausing the merge thread. */
-    public static enum PauseReason {
+    public enum PauseReason {
       /** Stopped (because of throughput rate set to 0, typically). */
       STOPPED,
       /** Temporarily paused because of exceeded throughput rate. */
@@ -196,6 +202,7 @@ public abstract class MergePolicy {
    *
    * @lucene.experimental */
   public static class OneMerge {
+    private final CompletableFuture<Boolean> mergeCompleted = new CompletableFuture<>();
     SegmentCommitInfo info;         // used by IndexWriter
     boolean registerDone;           // used by IndexWriter
     long mergeGen;                  // used by IndexWriter
@@ -222,7 +229,7 @@ public abstract class MergePolicy {
     volatile long mergeStartNS = -1;
 
     /** Total number of documents in segments to be merged, not accounting for deletions. */
-    public final int totalMaxDoc;
+    final int totalMaxDoc;
     Throwable error;
 
     /** Sole constructor.
@@ -233,13 +240,8 @@ public abstract class MergePolicy {
         throw new RuntimeException("segments must include at least one segment");
       }
       // clone the list, as the in list may be based off original SegmentInfos and may be modified
-      this.segments = new ArrayList<>(segments);
-      int count = 0;
-      for(SegmentCommitInfo info : segments) {
-        count += info.info.maxDoc();
-      }
-      totalMaxDoc = count;
-
+      this.segments = List.copyOf(segments);
+      totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
       mergeProgress = new OneMergeProgress();
     }
 
@@ -251,8 +253,12 @@ public abstract class MergePolicy {
       mergeProgress.setMergeThread(Thread.currentThread());
     }
     
-    /** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
-    public void mergeFinished() throws IOException {
+    /** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
+     * @param success true iff the merge finished successfully ie. was committed */
+    public void mergeFinished(boolean success) throws IOException {
+      if (mergeCompleted.complete(success) == false) {
+        throw new IllegalStateException("merge has already finished");
+      }
     }
 
     /** Wrap the reader in order to add/remove information to the merged segment. */
@@ -362,6 +368,37 @@ public abstract class MergePolicy {
     public OneMergeProgress getMergeProgress() {
       return mergeProgress;
     }
+
+    /**
+     * Waits for this merge to be completed
+     * @return true if the merge finished within the specified timeout
+     */
+    boolean await(long timeout, TimeUnit timeUnit) {
+      try {
+        mergeCompleted.get(timeout, timeUnit);
+        return true;
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      } catch (ExecutionException | TimeoutException e) {
+        return false;
+      }
+    }
+
+    /**
+     * Returns true if the merge has finished or false if it's still running or
+     * has not been started. This method will not block.
+     */
+    boolean isDone() {
+      return mergeCompleted.isDone();
+    }
+
+    /**
+     * Returns true iff the merge completed successfully or false if the merge succeeded with a failure.
+     * This method will not block and return an empty Optional if the merge has not finished yet
+     */
+    Optional<Boolean> hasCompletedSuccessfully() {
+      return Optional.ofNullable(mergeCompleted.getNow(null));
+    }
   }
 
   /**
@@ -399,6 +436,22 @@ public abstract class MergePolicy {
       }
       return b.toString();
     }
+
+    /**
+     * Waits if necessary for at most the given time for all merges.
+     */
+    boolean await(long timeout, TimeUnit unit) {
+      try {
+        CompletableFuture<Void> future = CompletableFuture.allOf(merges.stream()
+            .map(m -> m.mergeCompleted).collect(Collectors.toList()).toArray(CompletableFuture<?>[]::new));
+        future.get(timeout, unit);
+        return true;
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      } catch (ExecutionException | TimeoutException e) {
+        return false;
+      }
+    }
   }
 
   /** Exception thrown if there are any problems while executing a merge. */
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
index 4b82800..7fdad3b 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDemoParallelLeafReader.java
@@ -538,7 +538,7 @@ public class TestDemoParallelLeafReader extends LuceneTestCase {
         }
 
         @Override
-        public void mergeFinished() throws IOException {
+        public void mergeFinished(boolean success) throws IOException {
           Throwable th = null;
           for (ParallelLeafReader r : parallelReaders) {
             try {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
index f3ed96d..f8dcae7 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java
@@ -4181,7 +4181,7 @@ public class TestIndexWriter extends LuceneTestCase {
             SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
             return new MergePolicy.OneMerge(merge.segments) {
               @Override
-              public void mergeFinished() {
+              public void mergeFinished(boolean success) {
                 onlyFinishOnce.set(true);
               }
             };
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
new file mode 100644
index 0000000..c252da0
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.StringHelper;
+import org.apache.lucene.util.TestUtil;
+import org.apache.lucene.util.Version;
+
+public class TestMergePolicy extends LuceneTestCase {
+
+  public void testWaitForOneMerge() throws IOException, InterruptedException {
+    try (Directory dir = newDirectory()) {
+      MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 1 + random().nextInt(10));
+      for (MergePolicy.OneMerge m : ms.merges) {
+        assertFalse(m.hasCompletedSuccessfully().isPresent());
+      }
+      Thread t = new Thread(() -> {
+        try {
+          for (MergePolicy.OneMerge m : ms.merges) {
+            m.mergeFinished(true);
+          }
+        } catch (IOException e) {
+          throw new AssertionError(e);
+        }
+      });
+      t.start();
+      assertTrue(ms.await(100, TimeUnit.HOURS));
+      for (MergePolicy.OneMerge m : ms.merges) {
+        assertTrue(m.hasCompletedSuccessfully().get());
+      }
+      t.join();
+    }
+  }
+
+  public void testTimeout() throws IOException, InterruptedException {
+    try (Directory dir = newDirectory()) {
+      MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 3);
+      for (MergePolicy.OneMerge m : ms.merges) {
+        assertFalse(m.hasCompletedSuccessfully().isPresent());
+      }
+      Thread t = new Thread(() -> {
+        try {
+          ms.merges.get(0).mergeFinished(true);
+        } catch (IOException e) {
+          throw new AssertionError(e);
+        }
+      });
+      t.start();
+      assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
+      assertFalse(ms.merges.get(1).hasCompletedSuccessfully().isPresent());
+      t.join();
+    }
+  }
+
+  public void testTimeoutLargeNumberOfMerges() throws IOException, InterruptedException {
+    try (Directory dir = newDirectory()) {
+      MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 10000);
+      for (MergePolicy.OneMerge m : ms.merges) {
+        assertFalse(m.hasCompletedSuccessfully().isPresent());
+      }
+      AtomicInteger i = new AtomicInteger(0);
+      AtomicBoolean stop = new AtomicBoolean(false);
+      Thread t = new Thread(() -> {
+        while (stop.get() == false) {
+          try {
+            ms.merges.get(i.getAndIncrement()).mergeFinished(true);
+            Thread.sleep(1);
+          } catch (IOException | InterruptedException e) {
+            throw new AssertionError(e);
+          }
+        }
+      });
+      t.start();
+      assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
+      stop.set(true);
+      t.join();
+      for (int j = 0; j < ms.merges.size(); j++) {
+        if (j < i.get()) {
+          assertTrue(ms.merges.get(j).hasCompletedSuccessfully().get());
+        } else {
+          assertFalse(ms.merges.get(j).hasCompletedSuccessfully().isPresent());
+        }
+      }
+    }
+  }
+
+  public void testFinishTwice() throws IOException {
+    try (Directory dir = newDirectory()) {
+      MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
+      MergePolicy.OneMerge oneMerge = spec.merges.get(0);
+      oneMerge.mergeFinished(true);
+      expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
+    }
+  }
+
+  public void testTotalMaxDoc() throws IOException {
+    try (Directory dir = newDirectory()) {
+      MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
+      int docs = 0;
+      MergePolicy.OneMerge oneMerge = spec.merges.get(0);
+      for (SegmentCommitInfo info : oneMerge.segments) {
+        docs += info.info.maxDoc();
+      }
+      assertEquals(docs, oneMerge.totalMaxDoc);
+    }
+  }
+
+  private static MergePolicy.MergeSpecification createRandomMergeSpecification(Directory dir, int numMerges) {
+    MergePolicy.MergeSpecification ms = new MergePolicy.MergeSpecification();
+      for (int ii = 0; ii < numMerges; ++ii) {
+        final SegmentInfo si = new SegmentInfo(
+            dir, // dir
+            Version.LATEST, // version
+            Version.LATEST, // min version
+            TestUtil.randomSimpleString(random()), // name
+            random().nextInt(1000), // maxDoc
+            random().nextBoolean(), // isCompoundFile
+            null, // codec
+            Collections.emptyMap(), // diagnostics
+            TestUtil.randomSimpleString(// id
+                random(),
+                StringHelper.ID_LENGTH,
+                StringHelper.ID_LENGTH).getBytes(StandardCharsets.US_ASCII),
+            Collections.emptyMap(), // attributes
+            null /* indexSort */);
+        final List<SegmentCommitInfo> segments = new LinkedList<SegmentCommitInfo>();
+        segments.add(new SegmentCommitInfo(si, 0, 0, 0, 0, 0, StringHelper.randomId()));
+        ms.add(new MergePolicy.OneMerge(segments));
+      }
+      return ms;
+  }
+}