You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/01/07 23:41:57 UTC

[bookkeeper] branch master updated: Allow to configure BK for low latency busy-wait settings

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e426547  Allow to configure BK for low latency busy-wait settings
e426547 is described below

commit e4265470881431fb7a78c8ab714c237239169e12
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jan 7 15:41:52 2019 -0800

    Allow to configure BK for low latency busy-wait settings
    
    ### Motivation
    
    Added `enableBusyWait` configuration option to turn on CPU Affinity and busy wait on sockets and queues.
    
    
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1812 from merlimat/configure-busy-poll
---
 bookkeeper-common/pom.xml                          |  5 ++
 .../bookkeeper/common/util/OrderedExecutor.java    | 44 +++++++++--
 .../bookkeeper/common/util/OrderedScheduler.java   |  3 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java | 35 ++++++++-
 .../org/apache/bookkeeper/client/BookKeeper.java   | 32 ++------
 .../bookkeeper/conf/AbstractConfiguration.java     | 38 ++++++++++
 .../bookkeeper/conf/ServerConfiguration.java       | 22 ++++++
 .../apache/bookkeeper/proto/BookieNettyServer.java | 58 ++++++++++-----
 .../org/apache/bookkeeper/util/EventLoopUtil.java  | 85 ++++++++++++++++++++++
 conf/bk_server.conf                                |  9 +++
 shaded/bookkeeper-server-shaded/pom.xml            |  1 +
 shaded/distributedlog-core-shaded/pom.xml          |  1 +
 site/_data/config/bk_server.yaml                   | 10 +++
 13 files changed, 289 insertions(+), 54 deletions(-)

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 8946335..d2825a0 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>cpu-affinity</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
index 220e1af..a645d64 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java
@@ -31,6 +31,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -44,6 +45,8 @@ import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
+import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -68,6 +71,7 @@ import org.slf4j.MDC;
 @Slf4j
 public class OrderedExecutor implements ExecutorService {
     public static final int NO_TASK_LIMIT = -1;
+    private static final int DEFAULT_MAX_ARRAY_QUEUE_SIZE = 10_000;
     protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);
 
     final String name;
@@ -80,7 +84,7 @@ public class OrderedExecutor implements ExecutorService {
     final boolean preserveMdcForTaskExecution;
     final long warnTimeMicroSec;
     final int maxTasksInQueue;
-
+    final boolean enableBusyWait;
 
     public static Builder newBuilder() {
         return new Builder();
@@ -98,7 +102,7 @@ public class OrderedExecutor implements ExecutorService {
             }
             return new OrderedExecutor(name, numThreads, threadFactory, statsLogger,
                                            traceTaskExecution, preserveMdcForTaskExecution,
-                                           warnTimeMicroSec, maxTasksInQueue);
+                                           warnTimeMicroSec, maxTasksInQueue, enableBusyWait);
         }
     }
 
@@ -114,6 +118,7 @@ public class OrderedExecutor implements ExecutorService {
         protected boolean preserveMdcForTaskExecution = false;
         protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
         protected int maxTasksInQueue = NO_TASK_LIMIT;
+        protected boolean enableBusyWait = false;
 
         public AbstractBuilder<T> name(String name) {
             this.name = name;
@@ -155,6 +160,11 @@ public class OrderedExecutor implements ExecutorService {
             return this;
         }
 
+        public AbstractBuilder<T> enableBusyWait(boolean enableBusyWait) {
+            this.enableBusyWait = enableBusyWait;
+            return this;
+        }
+
         @SuppressWarnings("unchecked")
         public T build() {
             if (null == threadFactory) {
@@ -168,7 +178,8 @@ public class OrderedExecutor implements ExecutorService {
                 traceTaskExecution,
                 preserveMdcForTaskExecution,
                 warnTimeMicroSec,
-                maxTasksInQueue);
+                maxTasksInQueue,
+                enableBusyWait);
         }
     }
 
@@ -277,7 +288,15 @@ public class OrderedExecutor implements ExecutorService {
     }
 
     protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
-        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
+        BlockingQueue<Runnable> queue;
+        if (enableBusyWait) {
+            // Use queue with busy-wait polling strategy
+            queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ? maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE);
+        } else {
+            // By default, use regular JDK LinkedBlockingQueue
+            queue = new LinkedBlockingQueue<>();
+        }
+        return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, factory);
     }
 
     protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
@@ -361,12 +380,14 @@ public class OrderedExecutor implements ExecutorService {
      */
     protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
                                 StatsLogger statsLogger, boolean traceTaskExecution,
-                                boolean preserveMdcForTaskExecution, long warnTimeMicroSec, int maxTasksInQueue) {
+                                boolean preserveMdcForTaskExecution, long warnTimeMicroSec, int maxTasksInQueue,
+                                boolean enableBusyWait) {
         checkArgument(numThreads > 0);
         checkArgument(!StringUtils.isBlank(baseName));
 
         this.maxTasksInQueue = maxTasksInQueue;
         this.warnTimeMicroSec = warnTimeMicroSec;
+        this.enableBusyWait = enableBusyWait;
         name = baseName;
         threads = new ExecutorService[numThreads];
         threadIds = new long[numThreads];
@@ -374,12 +395,25 @@ public class OrderedExecutor implements ExecutorService {
             ThreadPoolExecutor thread = createSingleThreadExecutor(
                     new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
                     .setThreadFactory(threadFactory).build());
+
             threads[i] = addExecutorDecorators(getBoundedExecutor(thread));
 
             final int idx = i;
             try {
                 threads[idx].submit(() -> {
                     threadIds[idx] = Thread.currentThread().getId();
+
+                    if (enableBusyWait) {
+                        // Try to acquire 1 CPU core to the executor thread. If it fails we
+                        // are just logging the error and continuing, falling back to
+                        // non-isolated CPUs.
+                        try {
+                            CpuAffinity.acquireCore();
+                        } catch (Throwable t) {
+                            log.warn("Failed to acquire CPU core for thread {}", Thread.currentThread().getName(),
+                                    t.getMessage(), t);
+                        }
+                    }
                 }).get();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
index 3c1fef5..979b55d 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java
@@ -114,10 +114,9 @@ public class OrderedScheduler extends OrderedExecutor implements ScheduledExecut
                                long warnTimeMicroSec,
                                int maxTasksInQueue) {
         super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution,
-                preserveMdcForTaskExecution, warnTimeMicroSec, maxTasksInQueue);
+                preserveMdcForTaskExecution, warnTimeMicroSec, maxTasksInQueue, false /* enableBusyWait */);
     }
 
-
     @Override
     protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
         return new ScheduledThreadPoolExecutor(1, factory);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index a868536..dd47da8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -40,6 +40,7 @@ import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,7 +48,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.bookie.stats.JournalStats;
+import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
 import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
@@ -56,7 +59,6 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -472,6 +474,15 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
         @Override
         public void run() {
             LOG.info("ForceWrite Thread started");
+
+            if (conf.isBusyWaitEnabled()) {
+                try {
+                    CpuAffinity.acquireCore();
+                } catch (Exception e) {
+                    LOG.warn("Unable to acquire CPU core for Journal ForceWrite thread: {}", e.getMessage(), e);
+                }
+            }
+
             boolean shouldForceWrite = true;
             int numReqInLastForceWrite = 0;
             while (running) {
@@ -611,8 +622,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     private final ExecutorService cbThreadPool;
 
     // journal entry queue to commit
-    final BlockingQueue<QueueEntry> queue = new GrowableArrayBlockingQueue<QueueEntry>();
-    final BlockingQueue<ForceWriteRequest> forceWriteRequests = new GrowableArrayBlockingQueue<ForceWriteRequest>();
+    final BlockingQueue<QueueEntry> queue;
+    final BlockingQueue<ForceWriteRequest> forceWriteRequests;
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
@@ -628,6 +639,16 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
         super("BookieJournal-" + conf.getBookiePort());
+
+        if (conf.isBusyWaitEnabled()) {
+            // To achieve lower latency, use busy-wait blocking queue implementation
+            queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
+            forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
+        } else {
+            queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
+            forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
+        }
+
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         this.journalDirectory = journalDirectory;
@@ -906,6 +927,14 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     public void run() {
         LOG.info("Starting journal on {}", journalDirectory);
 
+        if (conf.isBusyWaitEnabled()) {
+            try {
+                CpuAffinity.acquireCore();
+            } catch (Exception e) {
+                LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
+            }
+        }
+
         RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
         int numEntriesToFlush = 0;
         ByteBuf lenBuff = Unpooled.buffer(4);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index b8f703d..0acf16f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -25,11 +25,11 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
@@ -39,9 +39,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
@@ -78,9 +78,9 @@ import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.EventLoopUtil;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.SystemUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -404,6 +404,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
                 .traceTaskExecution(conf.getEnableTaskExecutionStats())
                 .preserveMdcForTaskExecution(conf.getPreserveMdcForTaskExecution())
                 .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
+                .enableBusyWait(conf.isBusyWaitEnabled())
                 .build();
 
         // initialize stats logger
@@ -434,7 +435,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
         // initialize event loop group
         if (null == eventLoopGroup) {
-            this.eventLoopGroup = getDefaultEventLoopGroup(conf);
+            this.eventLoopGroup = EventLoopUtil.getClientEventLoopGroup(conf,
+                    new DefaultThreadFactory("bookkeeper-io"));
             this.ownEventLoopGroup = true;
         } else {
             this.eventLoopGroup = eventLoopGroup;
@@ -601,8 +603,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         return bookieWatcher;
     }
 
-    @VisibleForTesting
-    OrderedExecutor getMainWorkerPool() {
+    public OrderedExecutor getMainWorkerPool() {
         return mainWorkerPool;
     }
 
@@ -1282,7 +1283,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
      * @throws InterruptedException
      * @throws BKException
      */
-    @SuppressWarnings("unchecked")
     public void deleteLedger(long lId) throws InterruptedException, BKException {
         CompletableFuture<Void> future = new CompletableFuture<>();
         SyncDeleteCallback result = new SyncDeleteCallback(future);
@@ -1406,22 +1406,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         this.metadataDriver.close();
     }
 
-    static EventLoopGroup getDefaultEventLoopGroup(ClientConfiguration conf) {
-        ThreadFactory threadFactory = new DefaultThreadFactory("bookkeeper-io");
-        final int numThreads = conf.getNumIOThreads();
-
-        if (SystemUtils.IS_OS_LINUX) {
-            try {
-                return new EpollEventLoopGroup(numThreads, threadFactory);
-            } catch (Throwable t) {
-                LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", t.getMessage());
-                return new NioEventLoopGroup(numThreads, threadFactory);
-            }
-        } else {
-            return new NioEventLoopGroup(numThreads, threadFactory);
-        }
-    }
-
     @Override
     public CreateBuilder newCreateLedgerOp() {
         return new LedgerCreateOp.CreateBuilderImpl(this);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index f12200f..65d702b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -86,6 +86,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
             "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = "storeSystemTimeAsLedgerCreationTime";
 
+    protected static final String ENABLE_BUSY_WAIT = "enableBusyWait";
+
     // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
     protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
     protected static final String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
@@ -880,6 +882,42 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     }
 
     /**
+     * Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
+     *
+     * <p>Default is false
+     *
+     * @return the value of the option
+     */
+    public boolean isBusyWaitEnabled() {
+        return getBoolean(ENABLE_BUSY_WAIT, false);
+    }
+
+    /**
+     * Option to enable busy-wait settings.
+     *
+     * <p>Default is false.
+     *
+     * <p>WARNING: This option will enable spin-waiting on executors and IO threads
+     * in order to reduce latency during context switches. The spinning will
+     * consume 100% CPU even when bookie is not doing any work. It is
+     * recommended to reduce the number of threads in the main workers pool
+     * ({@link ClientConfiguration#setNumWorkerThreads(int)}) and Netty event
+     * loop {@link ClientConfiguration#setNumIOThreads(int)} to only have few
+     * CPU cores busy.
+     * </p>
+     *
+     * @param busyWaitEanbled
+     *            if enabled, use spin-waiting strategy to reduce latency in
+     *            context switches
+     *
+     * @see #isBusyWaitEnabled()
+     */
+    public T setBusyWaitEnabled(boolean busyWaitEanbled) {
+        setProperty(ENABLE_BUSY_WAIT, busyWaitEanbled);
+        return getThis();
+    }
+
+    /**
      * Return the flag indicating whether to limit stats logging.
      *
      * @return
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 1d38651..64f6c4b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -134,6 +134,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String JOURNAL_ALIGNMENT_SIZE = "journalAlignmentSize";
     protected static final String NUM_JOURNAL_CALLBACK_THREADS = "numJournalCallbackThreads";
     protected static final String JOURNAL_FORMAT_VERSION_TO_WRITE = "journalFormatVersionToWrite";
+    protected static final String JOURNAL_QUEUE_SIZE = "journalQueueSize";
     // backpressure control
     protected static final String MAX_ADDS_IN_PROGRESS_LIMIT = "maxAddsInProgressLimit";
     protected static final String MAX_READS_IN_PROGRESS_LIMIT = "maxReadsInProgressLimit";
@@ -771,6 +772,27 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Set the size of the journal queue.
+     *
+     * @param journalQueueSize
+     *            the max size of journal queue
+     * @return server configuration.
+     */
+    public ServerConfiguration setJournalQueueSize(int journalQueueSize) {
+        this.setProperty(JOURNAL_QUEUE_SIZE, journalQueueSize);
+        return this;
+    }
+
+    /**
+     * Get size of journal queue.
+     *
+     * @return the max size of journal queue.
+     */
+    public int getJournalQueueSize() {
+        return this.getInt(JOURNAL_QUEUE_SIZE, 10_000);
+    }
+
+    /**
      * Get max number of adds in progress. 0 == unlimited.
      *
      * @return Max number of adds in progress.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 1cbb345..add4537 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -33,7 +33,9 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -43,7 +45,6 @@ import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.local.LocalChannel;
 import io.netty.channel.local.LocalServerChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -60,7 +61,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ThreadFactory;
+import java.util.Queue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -71,11 +73,13 @@ import org.apache.bookkeeper.auth.BookKeeperPrincipal;
 import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
+import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.processor.RequestProcessor;
 import org.apache.bookkeeper.util.ByteBufList;
-import org.apache.commons.lang.SystemUtils;
+import org.apache.bookkeeper.util.EventLoopUtil;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,29 +115,43 @@ class BookieNettyServer {
         this.authProviderFactory = AuthProviderFactoryFactory.newBookieAuthProviderFactory(conf);
 
         if (!conf.isDisableServerSocketBind()) {
-            ThreadFactory threadFactory = new DefaultThreadFactory("bookie-io");
-            final int numThreads = conf.getServerNumIOThreads();
-
-            EventLoopGroup eventLoopGroup;
-            if (SystemUtils.IS_OS_LINUX) {
-                try {
-                    eventLoopGroup = new EpollEventLoopGroup(numThreads, threadFactory);
-                } catch (ExceptionInInitializerError | NoClassDefFoundError | UnsatisfiedLinkError e) {
-                    LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", e.getMessage());
-                    eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
-                }
-            } else {
-                eventLoopGroup = new NioEventLoopGroup(numThreads, threadFactory);
-            }
-
-            this.eventLoopGroup = eventLoopGroup;
+            this.eventLoopGroup = EventLoopUtil.getServerEventLoopGroup(conf, new DefaultThreadFactory("bookie-io"));
             allChannels = new CleanupChannelGroup(eventLoopGroup);
         } else {
             this.eventLoopGroup = null;
         }
 
         if (conf.isEnableLocalTransport()) {
-            jvmEventLoopGroup = new DefaultEventLoopGroup();
+            jvmEventLoopGroup = new DefaultEventLoopGroup(conf.getServerNumIOThreads()) {
+                @Override
+                protected EventLoop newChild(Executor executor, Object... args) throws Exception {
+                    return new DefaultEventLoop(this, executor) {
+                        @Override
+                        protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
+                            if (conf.isBusyWaitEnabled()) {
+                                return new BlockingMpscQueue<>(Math.min(maxPendingTasks, 10_000));
+                            } else {
+                                return super.newTaskQueue(maxPendingTasks);
+                            }
+                        }
+                    };
+                }
+            };
+
+            // Enable CPU affinity on IO threads
+            if (conf.isBusyWaitEnabled()) {
+                for (int i = 0; i < conf.getServerNumIOThreads(); i++) {
+                    jvmEventLoopGroup.next().submit(() -> {
+                        try {
+                            CpuAffinity.acquireCore();
+                        } catch (Throwable t) {
+                            LOG.warn("Failed to acquire CPU core for thread {}", Thread.currentThread().getName(),
+                                    t.getMessage(), t);
+                        }
+                    });
+                }
+            }
+
             allChannels = new CleanupChannelGroup(jvmEventLoopGroup);
         } else {
             jvmEventLoopGroup = null;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EventLoopUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EventLoopUtil.java
new file mode 100644
index 0000000..32cb228
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/EventLoopUtil.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SelectStrategy;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.concurrent.ThreadFactory;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.lang.SystemUtils;
+
+
+/**
+ * Utility class to initialize Netty event loops.
+ */
+@Slf4j
+@UtilityClass
+public class EventLoopUtil {
+    public static EventLoopGroup getClientEventLoopGroup(ClientConfiguration conf, ThreadFactory threadFactory) {
+        return getEventLoopGroup(threadFactory, conf.getNumIOThreads(), conf.isBusyWaitEnabled());
+    }
+
+    public static EventLoopGroup getServerEventLoopGroup(ServerConfiguration conf, ThreadFactory threadFactory) {
+        return getEventLoopGroup(threadFactory, conf.getServerNumIOThreads(), conf.isBusyWaitEnabled());
+    }
+
+    private static EventLoopGroup getEventLoopGroup(ThreadFactory threadFactory,
+            int numThreads, boolean enableBusyWait) {
+        if (!SystemUtils.IS_OS_LINUX) {
+            return new NioEventLoopGroup(numThreads, threadFactory);
+        }
+
+        try {
+            if (!enableBusyWait) {
+                // Regular Epoll based event loop
+                return new EpollEventLoopGroup(numThreads, threadFactory);
+            }
+
+            // With low latency setting, put the Netty event loop on busy-wait loop to reduce cost of
+            // context switches
+            EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(numThreads, threadFactory,
+                    () -> (selectSupplier, hasTasks) -> SelectStrategy.BUSY_WAIT);
+
+            // Enable CPU affinity on IO threads
+            for (int i = 0; i < numThreads; i++) {
+                eventLoopGroup.next().submit(() -> {
+                    try {
+                        CpuAffinity.acquireCore();
+                    } catch (Throwable t) {
+                        log.warn("Failed to acquire CPU core for thread {}", Thread.currentThread().getName(),
+                                t.getMessage(), t);
+                    }
+                });
+            }
+
+            return eventLoopGroup;
+        } catch (ExceptionInInitializerError | NoClassDefFoundError | UnsatisfiedLinkError e) {
+            log.warn("Could not use Netty Epoll event loop: {}", e.getMessage());
+            return new NioEventLoopGroup(numThreads, threadFactory);
+        }
+    }
+}
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index df6c991..23012dc 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -133,6 +133,12 @@ extraServerComponents=
 # avoid the executor queue to grow indefinitely
 # maxPendingAddRequestsPerThread=10000
 
+# Option to enable busy-wait settings. Default is false.
+# WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
+# context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to
+# reduce the number of threads in the main workers pool and Netty event loop to only have few CPU cores busy.
+# enableBusyWait=false
+
 #############################################################################
 ## Long poll request parameter settings
 #############################################################################
@@ -332,6 +338,9 @@ journalDirectories=/tmp/bk-txn
 # If we should flush the journal when journal queue is empty
 # journalFlushWhenQueueEmpty=false
 
+# Set the size of the journal queue.
+# journalQueueSize=10000
+
 #############################################################################
 ## Ledger storage settings
 #############################################################################
diff --git a/shaded/bookkeeper-server-shaded/pom.xml b/shaded/bookkeeper-server-shaded/pom.xml
index 820dc01..a39227b 100644
--- a/shaded/bookkeeper-server-shaded/pom.xml
+++ b/shaded/bookkeeper-server-shaded/pom.xml
@@ -66,6 +66,7 @@
                   <include>com.google.guava:guava</include>
                   <include>com.google.protobuf:protobuf-java</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
                   <include>org.apache.bookkeeper:bookkeeper-server</include>
diff --git a/shaded/distributedlog-core-shaded/pom.xml b/shaded/distributedlog-core-shaded/pom.xml
index 4f5c9f2..e5ef59f 100644
--- a/shaded/distributedlog-core-shaded/pom.xml
+++ b/shaded/distributedlog-core-shaded/pom.xml
@@ -87,6 +87,7 @@
                   <include>net.java.dev.jna:jna</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
                   <include>org.apache.bookkeeper:bookkeeper-server</include>
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index a711598..a42e12e 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -71,6 +71,13 @@ groups:
   - param: maxPendingReadRequestsPerThread
     description: If add worker threads are enabled, limit the number of pending requests, to avoid the executor queue to grow indefinitely. If zero or negative, the number of pending requests is unlimited.
     default: 10000
+  - param: enableBusyWait
+    description: |
+      Option to enable busy-wait settings. Default is false.
+      WARNING: This option will enable spin-waiting on executors and IO threads in order to reduce latency during
+      context switches. The spinning will consume 100% CPU even when bookie is not doing any work. It is recommended to
+      reduce the number of threads in the main workers pool and Netty event loop to only have few CPU cores busy.
+    default: false
 
 - name: Long poll settings
   params:
@@ -248,6 +255,9 @@ groups:
   - param: journalFlushWhenQueueEmpty
     description: If we should flush the journal when journal queue is empty.
     default: 'false'
+  - param: journalQueueSize
+    description: Set the size of the journal queue.
+    default: 10000
 
 - name: Ledger storage settings
   params: