You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2023/03/15 10:39:47 UTC

[lucene] branch main updated: Reduce contention in DocumentsWriterFlushControl. (#12198)

This is an automated email from the ASF dual-hosted git repository.

jpountz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/main by this push:
     new d407edf4b87 Reduce contention in DocumentsWriterFlushControl. (#12198)
d407edf4b87 is described below

commit d407edf4b87227735b3ba2d8a81af1994a02d8e9
Author: Adrien Grand <jp...@gmail.com>
AuthorDate: Wed Mar 15 11:39:40 2023 +0100

    Reduce contention in DocumentsWriterFlushControl. (#12198)
    
    lucene-util's `IndexGeoNames` benchmark is heavily contended when running with
    many indexing threads, 20 in my case. The main offender is
    `DocumentsWriterFlushControl#doAfterDocument`, which runs after every index
    operation to update doc and RAM accounting.
    
    This change reduces contention by only updating RAM accounting if the amount of
    RAM consumption that has not been committed yet by a single DWPT is at least
    0.1% of the total RAM buffer size. This effectively batches updates to RAM
    accounting, similarly to what happens when using `IndexWriter#addDocuments` to
    index multiple documents at once. Since updates to RAM accounting may be
    batched, `FlushPolicy` can no longer distinguish between inserts, updates and
    deletes, so all 3 methods got merged into a single one.
    
    With this change, `IndexGeoNames` goes from ~22s to ~19s and the main offender
    for contention is now `DocumentsWriterPerThreadPool#getAndLock`.
    
    Co-authored-by: Simon Willnauer <si...@apache.org>
---
 lucene/CHANGES.txt                                 |  2 +
 .../org/apache/lucene/index/DocumentsWriter.java   |  3 +-
 .../lucene/index/DocumentsWriterFlushControl.java  | 39 ++++++++--
 .../lucene/index/FlushByRamOrCountsPolicy.java     | 88 ++++++++++------------
 .../java/org/apache/lucene/index/FlushPolicy.java  | 29 +------
 .../lucene/index/TestFlushByRamOrCountsPolicy.java | 32 +-------
 6 files changed, 80 insertions(+), 113 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 62b91852e5d..bbadfc3332e 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -165,6 +165,8 @@ Optimizations
 
 * GITHUB#12179: Better PostingsEnum reuse in MultiTermQueryConstantScoreBlendedWrapper. (Greg Miller)
 
+* GITHUB#12198: Reduced contention when indexing with many threads. (Adrien Grand)
+
 Bug Fixes
 ---------------------
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 8ddab3640ef..625c87bb9cd 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -435,8 +435,7 @@ final class DocumentsWriter implements Closeable, Accountable {
           flushControl.doOnAbort(dwpt);
         }
       }
-      final boolean isUpdate = delNode != null && delNode.isDelete();
-      flushingDWPT = flushControl.doAfterDocument(dwpt, isUpdate);
+      flushingDWPT = flushControl.doAfterDocument(dwpt);
     } finally {
       if (dwpt.isFlushPending() || dwpt.isAborted()) {
         dwpt.unlock();
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 35c17ad16cb..64acae6ec7b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -185,8 +185,33 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
     return true;
   }
 
-  DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread, boolean isUpdate) {
+  /**
+   * Return the smallest number of bytes that we would like to make sure to not miss from the global
+   * RAM accounting.
+   */
+  private long ramBufferGranularity() {
+    double ramBufferSizeMB = config.getRAMBufferSizeMB();
+    if (ramBufferSizeMB == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
+      ramBufferSizeMB = config.getRAMPerThreadHardLimitMB();
+    }
+    // No more than ~0.1% of the RAM buffer size.
+    long granularity = (long) (ramBufferSizeMB * 1024.d);
+    // Or 16kB, so that with e.g. 64 active DWPTs, we'd never be missing more than 64*16kB = 1MB in
+    // the global RAM buffer accounting.
+    granularity = Math.min(granularity, 16 * 1024L);
+    return granularity;
+  }
+
+  DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread) {
     final long delta = perThread.getCommitLastBytesUsedDelta();
+    // in order to prevent contention in the case of many threads indexing small documents
+    // we skip ram accounting unless the DWPT accumulated enough ram to be worthwhile
+    if (config.getMaxBufferedDocs() == IndexWriterConfig.DISABLE_AUTO_FLUSH
+        && delta < ramBufferGranularity()) {
+      // Skip accounting for now, we'll come back to it later when the delta is bigger
+      return null;
+    }
+
     synchronized (this) {
       // we need to commit this under lock but calculate it outside of the lock to minimize the time
       // this lock is held
@@ -208,11 +233,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
         } else {
           activeBytes += delta;
           assert updatePeaks(delta);
-          if (isUpdate) {
-            flushPolicy.onUpdate(this, perThread);
-          } else {
-            flushPolicy.onInsert(this, perThread);
-          }
+          flushPolicy.onChange(this, perThread);
           if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) {
             // Safety check to prevent a single DWPT exceeding its RAM limit. This
             // is super important since we can not address more than 2048 MB per DWPT
@@ -462,7 +483,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
 
   synchronized void doOnDelete() {
     // pass null this is a global delete no update
-    flushPolicy.onDelete(this, null);
+    flushPolicy.onChange(this, null);
   }
 
   /**
@@ -724,7 +745,9 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
 
   synchronized DocumentsWriterPerThread findLargestNonPendingWriter() {
     DocumentsWriterPerThread maxRamUsingWriter = null;
-    long maxRamSoFar = 0;
+    // Note: should be initialized to -1 since some DWPTs might return 0 if their RAM usage has not
+    // been committed yet.
+    long maxRamSoFar = -1;
     int count = 0;
     for (DocumentsWriterPerThread next : perThreadPool) {
       if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
index afa96a52eec..bd09d59b69c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
@@ -21,22 +21,8 @@ package org.apache.lucene.index;
  * document count depending on the IndexWriter's {@link IndexWriterConfig}. It also applies pending
  * deletes based on the number of buffered delete terms.
  *
- * <ul>
- *   <li>{@link #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)} - applies pending
- *       delete operations based on the global number of buffered delete terms if the consumed
- *       memory is greater than {@link IndexWriterConfig#getRAMBufferSizeMB()}.
- *   <li>{@link #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)} - flushes either
- *       on the number of documents per {@link DocumentsWriterPerThread} ( {@link
- *       DocumentsWriterPerThread#getNumDocsInRAM()}) or on the global active memory consumption in
- *       the current indexing session iff {@link IndexWriterConfig#getMaxBufferedDocs()} or {@link
- *       IndexWriterConfig#getRAMBufferSizeMB()} is enabled respectively
- *   <li>{@link #onUpdate(DocumentsWriterFlushControl, DocumentsWriterPerThread)} - calls {@link
- *       #onInsert(DocumentsWriterFlushControl, DocumentsWriterPerThread)} and {@link
- *       #onDelete(DocumentsWriterFlushControl, DocumentsWriterPerThread)} in order
- * </ul>
- *
- * All {@link IndexWriterConfig} settings are used to mark {@link DocumentsWriterPerThread} as flush
- * pending during indexing with respect to their live updates.
+ * <p>All {@link IndexWriterConfig} settings are used to mark {@link DocumentsWriterPerThread} as
+ * flush pending during indexing with respect to their live updates.
  *
  * <p>If {@link IndexWriterConfig#setRAMBufferSizeMB(double)} is enabled, the largest ram consuming
  * {@link DocumentsWriterPerThread} will be marked as pending iff the global active RAM consumption
@@ -45,46 +31,54 @@ package org.apache.lucene.index;
 class FlushByRamOrCountsPolicy extends FlushPolicy {
 
   @Override
-  public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
-    if ((flushOnRAM()
-        && control.getDeleteBytesUsed() > 1024 * 1024 * indexWriterConfig.getRAMBufferSizeMB())) {
-      control.setApplyAllDeletes();
-      if (infoStream.isEnabled("FP")) {
-        infoStream.message(
-            "FP",
-            "force apply deletes bytesUsed="
-                + control.getDeleteBytesUsed()
-                + " vs ramBufferMB="
-                + indexWriterConfig.getRAMBufferSizeMB());
-      }
-    }
-  }
-
-  @Override
-  public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
-    if (flushOnDocCount()
+  public void onChange(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
+    if (perThread != null
+        && flushOnDocCount()
         && perThread.getNumDocsInRAM() >= indexWriterConfig.getMaxBufferedDocs()) {
       // Flush this state by num docs
       control.setFlushPending(perThread);
     } else if (flushOnRAM()) { // flush by RAM
       final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
-      final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
-      if (totalRam >= limit) {
-        if (infoStream.isEnabled("FP")) {
-          infoStream.message(
-              "FP",
-              "trigger flush: activeBytes="
-                  + control.activeBytes()
-                  + " deleteBytes="
-                  + control.getDeleteBytesUsed()
-                  + " vs limit="
-                  + limit);
-        }
-        markLargestWriterPending(control, perThread);
+      final long activeRam = control.activeBytes();
+      final long deletesRam = control.getDeleteBytesUsed();
+      if (deletesRam >= limit && activeRam >= limit && perThread != null) {
+        flushDeletes(control);
+        flushActiveBytes(control, perThread);
+      } else if (deletesRam >= limit) {
+        flushDeletes(control);
+      } else if (activeRam + deletesRam >= limit && perThread != null) {
+        flushActiveBytes(control, perThread);
       }
     }
   }
 
+  private void flushDeletes(DocumentsWriterFlushControl control) {
+    control.setApplyAllDeletes();
+    if (infoStream.isEnabled("FP")) {
+      infoStream.message(
+          "FP",
+          "force apply deletes bytesUsed="
+              + control.getDeleteBytesUsed()
+              + " vs ramBufferMB="
+              + indexWriterConfig.getRAMBufferSizeMB());
+    }
+  }
+
+  private void flushActiveBytes(
+      DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
+    if (infoStream.isEnabled("FP")) {
+      infoStream.message(
+          "FP",
+          "trigger flush: activeBytes="
+              + control.activeBytes()
+              + " deleteBytes="
+              + control.getDeleteBytesUsed()
+              + " vs ramBufferMB="
+              + indexWriterConfig.getRAMBufferSizeMB());
+    }
+    markLargestWriterPending(control, perThread);
+  }
+
   /** Marks the most ram consuming active {@link DocumentsWriterPerThread} flush pending */
   protected void markLargestWriterPending(
       DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
diff --git a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
index 61ee494e22a..d2abc28dfa2 100644
--- a/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
+++ b/lucene/core/src/java/org/apache/lucene/index/FlushPolicy.java
@@ -47,37 +47,14 @@ abstract class FlushPolicy {
   protected InfoStream infoStream;
 
   /**
-   * Called for each delete term. If this is a delete triggered due to an update the given {@link
-   * DocumentsWriterPerThread} is non-null.
+   * Called for each delete, insert or update. For pure deletes, the given {@link
+   * DocumentsWriterPerThread} may be {@code null}.
    *
    * <p>Note: This method is called synchronized on the given {@link DocumentsWriterFlushControl}
    * and it is guaranteed that the calling thread holds the lock on the given {@link
    * DocumentsWriterPerThread}
    */
-  public abstract void onDelete(
-      DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);
-
-  /**
-   * Called for each document update on the given {@link DocumentsWriterPerThread}'s {@link
-   * DocumentsWriterPerThread}.
-   *
-   * <p>Note: This method is called synchronized on the given {@link DocumentsWriterFlushControl}
-   * and it is guaranteed that the calling thread holds the lock on the given {@link
-   * DocumentsWriterPerThread}
-   */
-  public void onUpdate(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
-    onInsert(control, perThread);
-    onDelete(control, perThread);
-  }
-
-  /**
-   * Called for each document addition on the given {@link DocumentsWriterPerThread}s {@link
-   * DocumentsWriterPerThread}.
-   *
-   * <p>Note: This method is synchronized by the given {@link DocumentsWriterFlushControl} and it is
-   * guaranteed that the calling thread holds the lock on the given {@link DocumentsWriterPerThread}
-   */
-  public abstract void onInsert(
+  public abstract void onChange(
       DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread);
 
   /** Called by DocumentsWriter to initialize the FlushPolicy */
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
index 7d2b7f0e151..aca1c5e4898 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java
@@ -323,35 +323,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
     boolean hasMarkedPending = false;
 
     @Override
-    public void onDelete(DocumentsWriterFlushControl control, DocumentsWriterPerThread perThread) {
-      final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
-      final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
-      findPending(control, pending, notPending);
-      final boolean flushCurrent = perThread.isFlushPending();
-      final DocumentsWriterPerThread toFlush;
-      if (perThread.isFlushPending()) {
-        toFlush = perThread;
-      } else {
-        toFlush = null;
-      }
-      super.onDelete(control, perThread);
-      if (toFlush != null) {
-        if (flushCurrent) {
-          assertTrue(pending.remove(toFlush));
-        } else {
-          assertTrue(notPending.remove(toFlush));
-        }
-        assertTrue(toFlush.isFlushPending());
-        hasMarkedPending = true;
-      }
-
-      for (DocumentsWriterPerThread dwpt : notPending) {
-        assertFalse(dwpt.isFlushPending());
-      }
-    }
-
-    @Override
-    public void onInsert(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) {
+    public void onChange(DocumentsWriterFlushControl control, DocumentsWriterPerThread dwpt) {
       final ArrayList<DocumentsWriterPerThread> pending = new ArrayList<>();
       final ArrayList<DocumentsWriterPerThread> notPending = new ArrayList<>();
       findPending(control, pending, notPending);
@@ -370,7 +342,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
       } else {
         toFlush = null;
       }
-      super.onInsert(control, dwpt);
+      super.onChange(control, dwpt);
       if (toFlush != null) {
         if (flushCurrent) {
           assertTrue(pending.remove(toFlush));