You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/03/01 11:01:40 UTC

[bookkeeper] branch master updated: Remove callback threadpool for sending add responses (#3825)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dcd8d5869 Remove callback threadpool for sending add responses (#3825)
3dcd8d5869 is described below

commit 3dcd8d58692a55a3c95184d4692d5d4f2e65c11b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Mar 1 03:01:34 2023 -0800

    Remove callback threadpool for sending add responses (#3825)
    
    ### Motivation
    
    The option of triggering journal callbacks from a thread-pool instead of from the JournalForceWrite thread is not really helpful under any condition.
    
    The only thing this option does is to introduce an extra context switch to jump on this thread-pool and from there it will trigger the response, though basically no work is done in this thread-pool. It will just jump on the Netty IO-thread for the specific connection and from there the serialization and the socket write will be done.
    
    When using the thread-pool, one can see the effect of the extra-context switch and the contention on the execution.
    
    Additionally, when 0 threads are configured, we are using the Guava DirectExecutor which has non-zero overhead in the form of a mutex that is contended between multiple journal threads.
---
 .../java/org/apache/bookkeeper/bookie/Journal.java | 28 ++++------------------
 .../bookkeeper/conf/ServerConfiguration.java       |  2 ++
 conf/bk_server.conf                                |  4 ----
 3 files changed, 6 insertions(+), 28 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 737893aade..a71d4fb2c8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.bookie;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
@@ -43,8 +42,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -414,7 +411,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                     if (qe != null) {
                         qe.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
                         journalStats.getCbThreadPoolQueueSize().inc();
-                        cbThreadPool.execute(qe);
+                        qe.run();
                     }
                 }
 
@@ -713,14 +710,9 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
 
     private final String lastMarkFileName;
 
-    /**
-     * The thread pool used to handle callback.
-     */
-    private final ExecutorService cbThreadPool;
-
     private final Counter callbackTime;
     private final Counter journalTime;
-    private static String journalThreadName = "BookieJournal";
+    private static final String journalThreadName = "BookieJournal";
 
     // journal entry queue to commit
     final BlockingQueue<QueueEntry> queue;
@@ -778,14 +770,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         this.journalAlignmentSize = conf.getJournalAlignmentSize();
         this.journalPageCacheFlushIntervalMSec = conf.getJournalPageCacheFlushIntervalMSec();
         this.journalReuseFiles = conf.getJournalReuseFiles();
-        if (conf.getNumJournalCallbackThreads() > 0) {
-            this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
-                                                        new CbThreadFactory());
-            this.callbackTime = journalStatsLogger.getThreadScopedCounter("callback-thread-time");
-        } else {
-            this.cbThreadPool = MoreExecutors.newDirectExecutorService();
-            this.callbackTime = journalStatsLogger.getThreadScopedCounter("callback-time");
-        }
+        this.callbackTime = journalStatsLogger.getThreadScopedCounter("callback-time");
 
         this.journalTime = journalStatsLogger.getThreadScopedCounter("journal-thread-time");
 
@@ -1202,7 +1187,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                     numEntriesToFlush--;
                                     entry.setEnqueueCbThreadPooleQueueTime(MathUtils.nowInNano());
                                     journalStats.getCbThreadPoolQueueSize().inc();
-                                    cbThreadPool.execute(entry);
+                                    entry.run();
                                 }
                             }
 
@@ -1336,11 +1321,6 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
             }
 
             forceWriteThread.shutdown();
-            cbThreadPool.shutdown();
-            if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) {
-                LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
-            }
-            cbThreadPool.shutdownNow();
 
             running = false;
             this.interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index ca9b0ce46a..3fb8d18ef9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -2146,6 +2146,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
      *          number of threads to handle journal callbacks.
      * @return server configuration
      */
+    @Deprecated
     public ServerConfiguration setNumJournalCallbackThreads(int numThreads) {
         setProperty(NUM_JOURNAL_CALLBACK_THREADS, numThreads);
         return this;
@@ -2156,6 +2157,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
      *
      * @return the number of threads that handle journal callbacks.
      */
+    @Deprecated
     public int getNumJournalCallbackThreads() {
         return getInt(NUM_JOURNAL_CALLBACK_THREADS, 1);
     }
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 5017175219..cef24eaad1 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -122,10 +122,6 @@ extraServerComponents=
 # The number of threads that should handle long poll requests.
 # numLongPollWorkerThreads=0
 
-# The number of threads used for handling journal callback. If a zero or negative number is provided,
-# the callbacks are executed directly at force write threads.
-# numJournalCallbackThreads=1
-
 # Number of threads that should be used for high priority requests
 # (i.e. recovery reads and adds, and fencing).
 # numHighPriorityWorkerThreads=8