You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2018/11/21 13:20:36 UTC

lucene-solr:master: LUCENE-8571: Don't block on FrozenBufferedUpdates#apply during IW#processEvents

Repository: lucene-solr
Updated Branches:
  refs/heads/master 08dd681f0 -> 5f8855ee0


LUCENE-8571: Don't block on FrozenBufferedUpdates#apply during IW#processEvents

While indexing we try to apply frozen deletes packages concurrently
on indexing threads if necessary. This is done in an opaque way via
IndexWriter#processEvents. Yet, when we commit or refresh we have to
ensure we apply all frozen update packages before we return.
Today we execute the apply method in a blocking fashion which is unncessary
when we are in a IndexWriter#processEvents context, we block indexing
threads while they could just continue since it's already being applied.
We also might wait in BufferedUpdatesStream when we apply all necessary updates
were we can continue with other work instead of waiting.
This change also tries to apply the packages that are not currently applied
first in order to not unnecessarily block.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5f8855ee
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5f8855ee
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5f8855ee

Branch: refs/heads/master
Commit: 5f8855ee0bf57c8777775df8c10889eeee2e8d78
Parents: 08dd681
Author: Simon Willnauer <si...@apache.org>
Authored: Wed Nov 21 10:22:41 2018 +0100
Committer: Simon Willnauer <si...@apache.org>
Committed: Wed Nov 21 14:20:19 2018 +0100

----------------------------------------------------------------------
 .../lucene/index/BufferedUpdatesStream.java     |  11 +-
 .../lucene/index/FrozenBufferedUpdates.java     | 241 +++++++++++--------
 .../org/apache/lucene/index/IndexWriter.java    |   4 +-
 3 files changed, 147 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5f8855ee/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
index 91e590c..9a669e0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java
@@ -19,6 +19,7 @@ package org.apache.lucene.index;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -222,14 +223,22 @@ final class BufferedUpdatesStream implements Accountable {
       infoStream.message("BD", "waitApply: " + waitFor.size() + " packets: " + waitFor);
     }
 
+    ArrayList<FrozenBufferedUpdates> pendingPackets = new ArrayList<>();
     long totalDelCount = 0;
     for (FrozenBufferedUpdates packet : waitFor) {
       // Frozen packets are now resolved, concurrently, by the indexing threads that
       // create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue,
       // but if we get here and the packet is not yet resolved, we resolve it now ourselves:
-      packet.apply(writer);
+      if (packet.tryApply(writer) == false) {
+        // if somebody else is currently applying it - move on to the next one and force apply below
+        pendingPackets.add(packet);
+      }
       totalDelCount += packet.totalDelCount;
     }
+    for (FrozenBufferedUpdates packet : pendingPackets) {
+      // now block on all the packets that were concurrently applied to ensure they are due before we continue.
+      packet.forceApply(writer);
+    }
 
     if (infoStream.isEnabled("BD")) {
       infoStream.message("BD",

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5f8855ee/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
index 36834a3..bb84a79f1 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.IntConsumer;
 
 import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
@@ -83,6 +84,7 @@ final class FrozenBufferedUpdates {
 
   /** Counts down once all deletes/updates have been applied */
   public final CountDownLatch applied = new CountDownLatch(1);
+  private final ReentrantLock applyLock = new ReentrantLock();
 
   /** How many total documents were deleted/updated. */
   public long totalDelCount;
@@ -214,149 +216,173 @@ final class FrozenBufferedUpdates {
 
   /** Translates a frozen packet of delete term/query, or doc values
    *  updates, into their actual docIDs in the index, and applies the change.  This is a heavy
-   *  operation and is done concurrently by incoming indexing threads. */
+   *  operation and is done concurrently by incoming indexing threads.
+   *  This method will return immediately without blocking if another thread is currently
+   *  applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)}
+   *  must be called.
+   *  */
   @SuppressWarnings("try")
-  public synchronized void apply(IndexWriter writer) throws IOException {
-    if (applied.getCount() == 0) {
-      // already done
-      return;
+  boolean tryApply(IndexWriter writer) throws IOException {
+    if (applyLock.tryLock()) {
+      try {
+        forceApply(writer);
+        return true;
+      } finally {
+        applyLock.unlock();
+      }
     }
+    return false;
+  }
 
-    long startNS = System.nanoTime();
+  /** Translates a frozen packet of delete term/query, or doc values
+   *  updates, into their actual docIDs in the index, and applies the change.  This is a heavy
+   *  operation and is done concurrently by incoming indexing threads.
+   *  */
+  void forceApply(IndexWriter writer) throws IOException {
+    applyLock.lock();
+    try {
+      if (applied.getCount() == 0) {
+        // already done
+        return;
+      }
+      long startNS = System.nanoTime();
 
-    assert any();
+      assert any();
 
-    Set<SegmentCommitInfo> seenSegments = new HashSet<>();
+      Set<SegmentCommitInfo> seenSegments = new HashSet<>();
 
-    int iter = 0;
-    int totalSegmentCount = 0;
-    long totalDelCount = 0;
+      int iter = 0;
+      int totalSegmentCount = 0;
+      long totalDelCount = 0;
 
-    boolean finished = false;
+      boolean finished = false;
 
-    // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
-    // concurrent merges are running.  Once we are done, we check to see if a merge completed while we were running.  If so, we must retry
-    // resolving against the newly merged segment(s).  Eventually no merge finishes while we were running and we are done.
-    while (true) {
-      String messagePrefix;
-      if (iter == 0) {
-        messagePrefix = "";
-      } else {
-        messagePrefix = "iter " + iter;
-      }
+      // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
+      // concurrent merges are running.  Once we are done, we check to see if a merge completed while we were running.  If so, we must retry
+      // resolving against the newly merged segment(s).  Eventually no merge finishes while we were running and we are done.
+      while (true) {
+        String messagePrefix;
+        if (iter == 0) {
+          messagePrefix = "";
+        } else {
+          messagePrefix = "iter " + iter;
+        }
 
-      long iterStartNS = System.nanoTime();
+        long iterStartNS = System.nanoTime();
 
-      long mergeGenStart = writer.mergeFinishedGen.get();
+        long mergeGenStart = writer.mergeFinishedGen.get();
 
-      Set<String> delFiles = new HashSet<>();
-      BufferedUpdatesStream.SegmentState[] segStates;
+        Set<String> delFiles = new HashSet<>();
+        BufferedUpdatesStream.SegmentState[] segStates;
 
-      synchronized (writer) {
-        List<SegmentCommitInfo> infos = getInfosToApply(writer);
-        if (infos == null) {
-          break;
-        }
+        synchronized (writer) {
+          List<SegmentCommitInfo> infos = getInfosToApply(writer);
+          if (infos == null) {
+            break;
+          }
 
-        for (SegmentCommitInfo info : infos) {
-          delFiles.addAll(info.files());
-        }
+          for (SegmentCommitInfo info : infos) {
+            delFiles.addAll(info.files());
+          }
 
-        // Must open while holding IW lock so that e.g. segments are not merged
-        // away, dropped from 100% deletions, etc., before we can open the readers
-        segStates = openSegmentStates(writer, infos, seenSegments, delGen());
+          // Must open while holding IW lock so that e.g. segments are not merged
+          // away, dropped from 100% deletions, etc., before we can open the readers
+          segStates = openSegmentStates(writer, infos, seenSegments, delGen());
 
-        if (segStates.length == 0) {
+          if (segStates.length == 0) {
+
+            if (infoStream.isEnabled("BD")) {
+              infoStream.message("BD", "packet matches no segments");
+            }
+            break;
+          }
 
           if (infoStream.isEnabled("BD")) {
-            infoStream.message("BD", "packet matches no segments");
+            infoStream.message("BD", String.format(Locale.ROOT,
+                messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
+                this, segStates.length, mergeGenStart));
           }
-          break;
+
+          totalSegmentCount += segStates.length;
+
+          // Important, else IFD may try to delete our files while we are still using them,
+          // if e.g. a merge finishes on some of the segments we are resolving on:
+          writer.deleter.incRef(delFiles);
+        }
+
+        AtomicBoolean success = new AtomicBoolean();
+        long delCount;
+        try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
+          // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
+          delCount = apply(segStates);
+          success.set(true);
         }
 
+        // Since we just resolved some more deletes/updates, now is a good time to write them:
+        writer.writeSomeDocValuesUpdates();
+
+        // It's OK to add this here, even if the while loop retries, because delCount only includes newly
+        // deleted documents, on the segments we didn't already do in previous iterations:
+        totalDelCount += delCount;
+
         if (infoStream.isEnabled("BD")) {
           infoStream.message("BD", String.format(Locale.ROOT,
-                                                 messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
-                                                 this, segStates.length, mergeGenStart));
+              messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
+              this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
         }
+        if (privateSegment != null) {
+          // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
+          // be applied before it kicks off, so this private segment must already not be in the set of merging segments
 
-        totalSegmentCount += segStates.length;
+          break;
+        }
 
-        // Important, else IFD may try to delete our files while we are still using them,
-        // if e.g. a merge finishes on some of the segments we are resolving on:
-        writer.deleter.incRef(delFiles);
-      }
+        // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
+        // in pulling all our delGens into a merge:
+        synchronized (writer) {
+          long mergeGenCur = writer.mergeFinishedGen.get();
 
-      AtomicBoolean success = new AtomicBoolean();
-      long delCount;
-      try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
-        // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
-        delCount = apply(segStates);
-        success.set(true);
-      }
+          if (mergeGenCur == mergeGenStart) {
 
-      // Since we just resolved some more deletes/updates, now is a good time to write them:
-      writer.writeSomeDocValuesUpdates();
+            // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
 
-      // It's OK to add this here, even if the while loop retries, because delCount only includes newly
-      // deleted documents, on the segments we didn't already do in previous iterations:
-      totalDelCount += delCount;
+            // Record that this packet is finished:
+            writer.finished(this);
 
-      if (infoStream.isEnabled("BD")) {
-        infoStream.message("BD", String.format(Locale.ROOT,
-                                               messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
-                                               this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
-      }
-      if (privateSegment != null) {
-        // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
-        // be applied before it kicks off, so this private segment must already not be in the set of merging segments
-
-        break;
-      }
+            finished = true;
 
-      // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
-      // in pulling all our delGens into a merge:
-      synchronized (writer) {
-        long mergeGenCur = writer.mergeFinishedGen.get();
+            // No merge finished while we were applying, so we are done!
+            break;
+          }
+        }
 
-        if (mergeGenCur == mergeGenStart) {
+        if (infoStream.isEnabled("BD")) {
+          infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
+        }
 
-          // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
-          
-          // Record that this packet is finished:
-          writer.finished(this);
+        // A merge completed while we were running.  In this case, that merge may have picked up some of the updates we did, but not
+        // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
 
-          finished = true;
+        iter++;
+      }
 
-          // No merge finished while we were applying, so we are done!
-          break;
-        }
+      if (finished == false) {
+        // Record that this packet is finished:
+        writer.finished(this);
       }
 
       if (infoStream.isEnabled("BD")) {
-        infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
+        String message = String.format(Locale.ROOT,
+            "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
+            this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
+        if (iter > 0) {
+          message += "; " + (iter + 1) + " iters due to concurrent merges";
+        }
+        message += "; " + writer.getPendingUpdatesCount() + " packets remain";
+        infoStream.message("BD", message);
       }
-        
-      // A merge completed while we were running.  In this case, that merge may have picked up some of the updates we did, but not
-      // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
-
-      iter++;
-    }
-
-    if (finished == false) {
-      // Record that this packet is finished:
-      writer.finished(this);
-    }
-        
-    if (infoStream.isEnabled("BD")) {
-      String message = String.format(Locale.ROOT,
-                                     "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
-                                     this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
-      if (iter > 0) {
-        message += "; " + (iter+1) + " iters due to concurrent merges";
-      }
-      message += "; " + writer.getPendingUpdatesCount() + " packets remain";
-      infoStream.message("BD", message);
+    } finally {
+      applyLock.unlock();
     }
   }
 
@@ -411,6 +437,7 @@ final class FrozenBufferedUpdates {
 
   private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
                            boolean success, Set<String> delFiles) throws IOException {
+    assert applyLock.isHeldByCurrentThread();
     synchronized (writer) {
 
       BufferedUpdatesStream.ApplyDeletesResult result;
@@ -441,8 +468,8 @@ final class FrozenBufferedUpdates {
 
   /** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
    *  the number of new deleted or updated documents. */
-  private synchronized long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
-
+  private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
+    assert applyLock.isHeldByCurrentThread();
     if (delGen == -1) {
       // we were not yet pushed
       throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5f8855ee/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 80d11c1..028554b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2607,7 +2607,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
     // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock:
     eventQueue.add(w -> {
       try {
-        packet.apply(w);
+        // we call tryApply here since we don't want to block if a refresh or a flush is already applying the
+        // packet. The flush will retry this packet anyway to ensure all of them are applied
+        packet.tryApply(w);
       } catch (Throwable t) {
         try {
           w.onTragicEvent(t, "applyUpdatesPacket");