You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2019/01/08 20:16:12 UTC

[GitHub] merlimat closed pull request #1755: Configure Netty allocator in bookie and client

merlimat closed pull request #1755:  Configure Netty allocator in bookie and client 
URL: https://github.com/apache/bookkeeper/pull/1755
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 216aff9890..94776f0443 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -20,6 +20,7 @@
 package org.apache.bookkeeper.benchmark;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -175,7 +176,8 @@ public static void main(String[] args)
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(conf, eventLoop, PooledByteBufAllocator.DEFAULT, executor, scheduler,
+                NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 35441659a0..1889eb9b05 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -39,6 +39,10 @@
 
     private static final Logger log = LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
 
+    // Same as AbstractByteBufAllocator, but copied here since it's not visible
+    private static final int DEFAULT_INITIAL_CAPACITY = 256;
+    private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
+
     private final ByteBufAllocator pooledAllocator;
     private final ByteBufAllocator unpooledAllocator;
     private final PoolingPolicy poolingPolicy;
@@ -63,16 +67,22 @@
 
         if (poolingPolicy == PoolingPolicy.PooledDirect) {
             if (pooledAllocator == null) {
-                this.pooledAllocator = new PooledByteBufAllocator(
-                        true /* preferDirect */,
-                        poolingConcurrency /* nHeapArena */,
-                        poolingConcurrency /* nDirectArena */,
-                        PooledByteBufAllocator.defaultPageSize(),
-                        PooledByteBufAllocator.defaultMaxOrder(),
-                        PooledByteBufAllocator.defaultTinyCacheSize(),
-                        PooledByteBufAllocator.defaultSmallCacheSize(),
-                        PooledByteBufAllocator.defaultNormalCacheSize(),
-                        PooledByteBufAllocator.defaultUseCacheForAllThreads());
+                if (poolingConcurrency == PooledByteBufAllocator.defaultNumDirectArena()) {
+                    // If all the parameters are the same as in the default Netty pool,
+                    // just reuse the static instance as the underlying allocator.
+                    this.pooledAllocator = PooledByteBufAllocator.DEFAULT;
+                } else {
+                    this.pooledAllocator = new PooledByteBufAllocator(
+                            true /* preferDirect */,
+                            poolingConcurrency /* nHeapArena */,
+                            poolingConcurrency /* nDirectArena */,
+                            PooledByteBufAllocator.defaultPageSize(),
+                            PooledByteBufAllocator.defaultMaxOrder(),
+                            PooledByteBufAllocator.defaultTinyCacheSize(),
+                            PooledByteBufAllocator.defaultSmallCacheSize(),
+                            PooledByteBufAllocator.defaultNormalCacheSize(),
+                            PooledByteBufAllocator.defaultUseCacheForAllThreads());
+                }
             } else {
                 this.pooledAllocator = pooledAllocator;
             }
@@ -109,6 +119,25 @@
         }
     }
 
+    @Override
+    public ByteBuf buffer() {
+        return buffer(DEFAULT_INITIAL_CAPACITY);
+    }
+
+    @Override
+    public ByteBuf buffer(int initialCapacity) {
+        return buffer(initialCapacity, DEFAULT_MAX_CAPACITY);
+    }
+
+    @Override
+    public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            return newDirectBuffer(initialCapacity, maxCapacity, true /* can fallback to heap if needed */);
+        } else {
+            return newHeapBuffer(initialCapacity, maxCapacity);
+        }
+    }
+
     @Override
     protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
         try {
@@ -125,30 +154,33 @@ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
 
     @Override
     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+        // If caller asked specifically for a direct buffer, we cannot fallback to heap
+        return newDirectBuffer(initialCapacity, maxCapacity, false);
+    }
+
+    private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean canFallbackToHeap) {
         if (poolingPolicy == PoolingPolicy.PooledDirect) {
             try {
                 return pooledAllocator.directBuffer(initialCapacity, maxCapacity);
             } catch (OutOfMemoryError e) {
-                switch (outOfMemoryPolicy) {
-                case ThrowException:
-                    outOfMemoryListener.accept(e);
-                    throw e;
-
-                case FallbackToHeap:
+                if (canFallbackToHeap && outOfMemoryPolicy == OutOfMemoryPolicy.FallbackToHeap) {
                     try {
                         return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
                     } catch (OutOfMemoryError e2) {
                         outOfMemoryListener.accept(e2);
                         throw e2;
                     }
+                } else {
+                    // ThrowException
+                    outOfMemoryListener.accept(e);
+                    throw e;
                 }
-                return null;
             }
         } else {
             // Unpooled heap buffer. Force heap buffers because unpooled direct
             // buffers have very high overhead of allocation/reclaiming
             try {
-                return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+                return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
             } catch (OutOfMemoryError e) {
                 outOfMemoryListener.accept(e);
                 throw e;
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 8ff66c3317..662dd832c4 100644
--- a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -145,10 +145,10 @@ public void testOomWithFallbackAndNoMoreHeap() {
     }
 
     @Test
-    public void testOomUnpooled() {
+    public void testOomUnpooledDirect() {
         ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
-        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
-        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+        OutOfMemoryError noMemError = new OutOfMemoryError("no more direct mem");
+        when(heapAlloc.directBuffer(anyInt(), anyInt())).thenThrow(noMemError);
 
         AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
 
@@ -166,11 +166,11 @@ public void testOomUnpooled() {
             fail("Should have thrown exception");
         } catch (OutOfMemoryError e) {
             // Expected
-            assertEquals(noHeapError, e);
+            assertEquals(noMemError, e);
         }
 
         // Ensure the notification was triggered even when exception is thrown
-        assertEquals(noHeapError, receivedException.get());
+        assertEquals(noMemError, receivedException.get());
     }
 
     @Test
@@ -214,7 +214,7 @@ public void testUnpooled() {
 
         ByteBuf buf2 = alloc.directBuffer();
         assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
-        assertTrue(buf2.hasArray());
+        assertFalse(buf2.hasArray());
     }
 
     @Test
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 10b5ea4148..c1b684eea6 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -30,6 +30,11 @@
       <artifactId>bookkeeper-common</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common-allocator</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-proto</artifactId>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index fd352d8e71..a0acd31c43 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -28,9 +28,13 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
@@ -53,6 +57,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
@@ -136,6 +141,8 @@
     final StatsLogger statsLogger;
     private final BookieStats bookieStats;
 
+    private final ByteBufAllocator allocator;
+
     /**
      * Exception is thrown when no such a ledger is found in this bookie.
      */
@@ -600,7 +607,7 @@ public static File getCurrentDirectory(File dir) {
 
     public Bookie(ServerConfiguration conf)
             throws IOException, InterruptedException, BookieException {
-        this(conf, NullStatsLogger.INSTANCE);
+        this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
     }
 
     private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) throws IOException {
@@ -658,12 +665,13 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
                 null,
                 checkpointSource,
                 checkpointer,
-                statsLogger);
+                statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
 
         return ledgerStorage;
     }
 
-    public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
+    public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
             throws IOException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
         this.statsLogger = statsLogger;
@@ -676,6 +684,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
         this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
         this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
                                                        this.ledgerDirsManager);
+        this.allocator = allocator;
 
         // instantiate zookeeper client to initialize ledger manager
         this.metadataDriver = instantiateMetadataDriver(conf);
@@ -732,7 +741,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
             journals.add(new Journal(i, journalDirectories.get(i),
-                         conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)));
+                    conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
         }
 
         this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -786,7 +795,8 @@ public void start() {
             stateManager,
             checkpointSource,
             syncThread,
-            statsLogger);
+            statsLogger,
+            allocator);
 
 
         handles = new HandleFactoryImpl(ledgerStorage);
@@ -1287,8 +1297,8 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
         }
     }
 
-    static ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
-        ByteBuf bb = PooledByteBufAllocator.DEFAULT.directBuffer(8 + 8 + 4 + explicitLac.capacity());
+    private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
+        ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + explicitLac.capacity());
         bb.writeLong(ledgerId);
         bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
         bb.writeInt(explicitLac.capacity());
@@ -1485,6 +1495,10 @@ public synchronized void waitZero() throws InterruptedException {
         }
     }
 
+    public ByteBufAllocator getAllocator() {
+        return allocator;
+    }
+
     /**
      * Format the bookie server data.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f53195a3fa..dc9137e6a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -29,7 +29,9 @@
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -917,8 +919,8 @@ int runCmd(CommandLine cmdLine) throws Exception {
                     ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                         new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
 
-                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
-                        scheduler, NullStatsLogger.INSTANCE);
+                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup,
+                            UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
 
                     LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
                         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -2825,9 +2827,9 @@ public void start() {
             };
 
             dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
-                        checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                        checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
             interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
-                    null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                    null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
             LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
 
             int convertedLedgers = 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 633c5400da..31fb2035ea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -66,23 +66,24 @@
     private boolean closed = false;
 
     // make constructor to be public for unit test
-    public BufferedChannel(FileChannel fc, int capacity) throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, 0L);
+        this(allocator, fc, capacity, 0L);
     }
 
-    public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity, long unpersistedBytesBound)
+            throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, capacity, unpersistedBytesBound);
+        this(allocator, fc, capacity, capacity, unpersistedBytesBound);
     }
 
-    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
-            throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+            long unpersistedBytesBound) throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
         this.position = new AtomicLong(fc.position());
         this.writeBufferStartPosition.set(position.get());
-        this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+        this.writeBuffer = allocator.directBuffer(writeCapacity);
         this.unpersistedBytes = new AtomicLong(0);
         this.unpersistedBytesBound = unpersistedBytesBound;
         this.doRegularFlushes = unpersistedBytesBound > 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index af82620bf7..1389370a46 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -88,9 +88,9 @@
         private final File logFile;
         private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
-        public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
-                long unpersistedBytesBound) throws IOException {
-            super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
+        public BufferedLogChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+                long logId, File logFile, long unpersistedBytesBound) throws IOException {
+            super(allocator, fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
             this.logFile = logFile;
@@ -283,6 +283,8 @@ public void accept(long ledgerId, long size) {
 
     private final int maxSaneEntrySize;
 
+    private final ByteBufAllocator allocator;
+
     final ServerConfiguration conf;
     /**
      * Scan entries in a entry log file.
@@ -332,15 +334,16 @@ public EntryLogger(ServerConfiguration conf) throws IOException {
      */
     public EntryLogger(ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) throws IOException {
-        this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+        this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
     }
 
     public EntryLogger(ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger)
-                    throws IOException {
+            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         //We reserve 500 bytes as overhead for the protocol.  This is not 100% accurate
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
+        this.allocator = allocator;
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -371,7 +374,7 @@ public EntryLogger(ServerConfiguration conf,
         }
         this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
         this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
-                logId);
+                logId, allocator);
         if (entryLogPerLedgerEnabled) {
             this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
                     entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger);
@@ -840,7 +843,7 @@ public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
             throw new IOException(e.toString());
         }
 
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entry.entrySize, entry.entrySize);
+        ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
         int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
         if (rc != entry.entrySize) {
             // Note that throwing NoEntryException here instead of IOException is not
@@ -872,7 +875,7 @@ private Header getHeaderForLogId(long entryLogId) throws IOException {
         BufferedReadChannel bc = getChannelForLogId(entryLogId);
 
         // Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+        ByteBuf headers = allocator.directBuffer(LOGFILE_HEADER_SIZE);
         try {
             bc.read(headers, 0);
 
@@ -988,7 +991,7 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
         long pos = LOGFILE_HEADER_SIZE;
 
         // Start with a reasonably sized buffer size
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024);
+        ByteBuf data = allocator.directBuffer(1024 * 1024);
 
         try {
 
@@ -1070,7 +1073,7 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce
         EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
 
         final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
+        ByteBuf ledgersMap = allocator.directBuffer(maxMapSize);
 
         try {
             while (offset < bc.size()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 3ddd8e2e73..1c32b55de0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -25,6 +25,7 @@
 import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.BufferedWriter;
@@ -61,11 +62,14 @@
     private final Object createCompactionLogLock = new Object();
     private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
     private final boolean entryLogPreAllocationEnabled;
+    private final ByteBufAllocator byteBufAllocator;
     final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
 
     EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId) {
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
+            ByteBufAllocator byteBufAllocator) {
         this.conf = conf;
+        this.byteBufAllocator = byteBufAllocator;
         this.ledgerDirsManager = ledgerDirsManager;
         this.preallocatedLogId = logId;
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
@@ -161,7 +165,7 @@ private synchronized BufferedLogChannel allocateNewLog(File dirForNextEntryLog,
         File newLogFile = new File(dirForNextEntryLog, logFileName);
         FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
 
-        BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+        BufferedLogChannel logChannel = new BufferedLogChannel(byteBufAllocator, channel, conf.getWriteBufferBytes(),
                 conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
         logfileHeader.readerIndex(0);
         logChannel.write(logfileHeader);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index d1287c3192..3b5bf0114d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -35,6 +35,8 @@
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -129,7 +131,8 @@ public void initialize(ServerConfiguration conf,
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
         initializeWithEntryLogListener(
             conf,
@@ -140,7 +143,8 @@ public void initialize(ServerConfiguration conf,
             checkpointSource,
             checkpointer,
             this,
-            statsLogger);
+            statsLogger,
+            allocator);
     }
 
     void initializeWithEntryLogListener(ServerConfiguration conf,
@@ -151,7 +155,8 @@ void initializeWithEntryLogListener(ServerConfiguration conf,
                                         CheckpointSource checkpointSource,
                                         Checkpointer checkpointer,
                                         EntryLogListener entryLogListener,
-                                        StatsLogger statsLogger) throws IOException {
+                                        StatsLogger statsLogger,
+                                        ByteBufAllocator allocator) throws IOException {
         initializeWithEntryLogger(
                 conf,
                 ledgerManager,
@@ -160,7 +165,8 @@ void initializeWithEntryLogListener(ServerConfiguration conf,
                 stateManager,
                 checkpointSource,
                 checkpointer,
-                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
+                        allocator),
                 statsLogger);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd47da8bde..0a78fcada8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -26,7 +26,9 @@
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -85,8 +87,8 @@
      */
     @FunctionalInterface
     public interface BufferedChannelBuilder {
-        BufferedChannelBuilder DEFAULT_BCBUILDER =
-                (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+        BufferedChannelBuilder DEFAULT_BCBUILDER = (FileChannel fc,
+                int capacity) -> new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
 
         BufferedChannel create(FileChannel fc, int capacity) throws IOException;
     }
@@ -627,18 +629,21 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
+    private final ByteBufAllocator allocator;
 
     // Expose Stats
     private final JournalStats journalStats;
 
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) {
-        this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+        this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
     }
 
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
+            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
         super("BookieJournal-" + conf.getBookiePort());
+        this.allocator = allocator;
 
         if (conf.isBusyWaitEnabled()) {
             // To achieve lower latency, use busy-wait blocking queue implementation
@@ -1161,7 +1166,7 @@ public void run() {
     }
 
     public BufferedChannelBuilder getBufferedChannelBuilder() {
-        return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+        return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 111b8c213f..1353e8b528 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,6 +23,8 @@
 
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,7 +55,8 @@ void initialize(ServerConfiguration conf,
                     StateManager stateManager,
                     CheckpointSource checkpointSource,
                     Checkpointer checkpointer,
-                    StatsLogger statsLogger)
+                    StatsLogger statsLogger,
+                    ByteBufAllocator allocator)
             throws IOException;
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 5125c077fe..cb6e9f0981 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -38,9 +40,9 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyBookie.class);
 
-    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
+    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
             throws IOException, KeeperException, InterruptedException, BookieException {
-        super(conf, statsLogger);
+        super(conf, statsLogger, allocator);
         if (conf.isReadOnlyModeEnabled()) {
             stateManager.forceToReadOnly();
         } else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
index 9fdc34ca7d..e9731a2590 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -22,6 +22,7 @@
  */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -36,12 +37,13 @@
     public volatile long addDelay = 0;
     public volatile long flushDelay = 0;
 
-    public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
-        super(fc, capacity);
+    public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
+        super(allocator, fc, capacity);
     }
 
-    public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
-        super(fc, writeCapacity, readCapacity);
+    public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity)
+            throws IOException {
+        super(allocator, fc, writeCapacity, readCapacity);
     }
 
     public void setAddDelay(long delay) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 5e4dbadd2a..77653db964 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,6 +24,8 @@
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
@@ -72,7 +74,8 @@ public void initialize(ServerConfiguration conf,
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
 
         interleavedLedgerStorage.initializeWithEntryLogListener(
@@ -86,7 +89,8 @@ public void initialize(ServerConfiguration conf,
             // uses sorted ledger storage's own entry log listener
             // since it manages entry log rotations and checkpoints.
             this,
-            statsLogger);
+            statsLogger,
+            allocator);
 
         if (conf.isEntryLogPerLedgerEnabled()) {
             this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ccabaed15d..287f4526f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.DefaultThreadFactory;
 //CHECKSTYLE.OFF: IllegalImport
 import io.netty.util.internal.PlatformDependent;
@@ -63,7 +64,6 @@
 import org.apache.bookkeeper.util.DiskChecker;
 
 
-
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
  */
@@ -88,13 +88,16 @@
     private ScheduledExecutorService gcExecutor;
     private DbLedgerStorageStats stats;
 
+    protected ByteBufAllocator allocator;
+
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+            Checkpointer checkpointer, StatsLogger statsLogger, ByteBufAllocator allocator) throws IOException {
         long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
         long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
+        this.allocator = allocator;
         this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
@@ -139,7 +142,8 @@ protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(Serve
             StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
             throws IOException {
         return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+                stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
+                readCacheSize);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
index b14478fccd..986c741a01 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -57,13 +57,15 @@
 
     private final int segmentSize;
 
+    private ByteBufAllocator allocator;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public ReadCache(long maxCacheSize) {
-        this(maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
+    public ReadCache(ByteBufAllocator allocator, long maxCacheSize) {
+        this(allocator, maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
     }
 
-    public ReadCache(long maxCacheSize, int maxSegmentSize) {
+    public ReadCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
+        this.allocator = allocator;
         int segmentsCount = Math.max(2, (int) (maxCacheSize / maxSegmentSize));
         segmentSize = (int) (maxCacheSize / segmentsCount);
 
@@ -140,7 +142,7 @@ public ByteBuf get(long ledgerId, long entryId) {
                     int entryOffset = (int) res.first;
                     int entryLen = (int) res.second;
 
-                    ByteBuf entry = ByteBufAllocator.DEFAULT.directBuffer(entryLen, entryLen);
+                    ByteBuf entry = allocator.directBuffer(entryLen, entryLen);
                     entry.writeBytes(cacheSegments.get(segmentIdx), entryOffset, entryLen);
                     return entry;
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 197014aa93..3289b3d2f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -27,6 +27,7 @@
 import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
@@ -132,7 +133,8 @@
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
             CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
-            ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+            ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+            throws IOException {
 
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
@@ -141,8 +143,8 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
         log.info("Creating single directory db ledger storage on {}", baseDir);
 
         this.writeCacheMaxSize = writeCacheSize;
-        this.writeCache = new WriteCache(writeCacheMaxSize / 2);
-        this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
+        this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
 
         this.checkpointSource = checkpointSource;
 
@@ -153,7 +155,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
                 DEFAULT_MAX_THROTTLE_TIME_MILLIS);
         maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
 
-        readCache = new ReadCache(readCacheMaxSize);
+        readCache = new ReadCache(allocator, readCacheMaxSize);
 
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
@@ -164,7 +166,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
         dbLedgerStorageStats = new DbLedgerStorageStats(
@@ -179,7 +181,8 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+            Checkpointer checkpointer, StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         /// Initialized in constructor
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index 08ffe6732d..ac58e8eaca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -80,17 +80,20 @@
 
     private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
 
-    public WriteCache(long maxCacheSize) {
+    private final ByteBufAllocator allocator;
+
+    public WriteCache(ByteBufAllocator allocator, long maxCacheSize) {
         // Default maxSegmentSize set to 1Gb
-        this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+        this(allocator, maxCacheSize, 1 * 1024 * 1024 * 1024);
     }
 
-    public WriteCache(long maxCacheSize, int maxSegmentSize) {
+    public WriteCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
         checkArgument(maxSegmentSize > 0);
 
         long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
         checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment size needs to be in form of 2^n");
 
+        this.allocator = allocator;
         this.maxCacheSize = maxCacheSize;
         this.maxSegmentSize = (int) maxSegmentSize;
         this.segmentOffsetMask = maxSegmentSize - 1;
@@ -185,7 +188,7 @@ public ByteBuf get(long ledgerId, long entryId) {
 
         long offset = result.first;
         int size = (int) result.second;
-        ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+        ByteBuf entry = allocator.buffer(size, size);
 
         int localOffset = (int) (offset & segmentOffsetMask);
         int segmentIdx = (int) (offset >>> segmentOffsetBits);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 0acf16fd75..94591afc80 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -26,6 +26,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -56,6 +58,7 @@
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -99,10 +102,11 @@
  */
 public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
-    static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
 
     final EventLoopGroup eventLoopGroup;
+    private final ByteBufAllocator allocator;
 
     // The stats logger for this client.
     private final StatsLogger statsLogger;
@@ -149,6 +153,7 @@
 
         ZooKeeper zk = null;
         EventLoopGroup eventLoopGroup = null;
+        ByteBufAllocator allocator = null;
         StatsLogger statsLogger = NullStatsLogger.INSTANCE;
         DNSToSwitchMapping dnsResolver = null;
         HashedWheelTimer requestTimer = null;
@@ -213,6 +218,18 @@ public Builder eventLoopGroup(EventLoopGroup f) {
             return this;
         }
 
+        /**
+         * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+         *
+         * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+         * @return client builder.
+         * @since 4.9
+         */
+        public Builder allocator(ByteBufAllocator allocator) {
+            this.allocator = allocator;
+            return this;
+        }
+
         /**
          * Configure the bookkeeper client with a provided {@link ZooKeeper} client.
          *
@@ -276,7 +293,8 @@ public Builder featureProvider(FeatureProvider featureProvider) {
 
         public BookKeeper build() throws IOException, InterruptedException, BKException {
             checkNotNull(statsLogger, "No stats logger provided");
-            return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider);
+            return new BookKeeper(conf, zk, eventLoopGroup, allocator, statsLogger, dnsResolver, requestTimer,
+                    featureProvider);
         }
     }
 
@@ -313,7 +331,7 @@ public BookKeeper(String servers) throws IOException, InterruptedException,
      */
     public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, BKException {
-        this(conf, null, null, NullStatsLogger.INSTANCE,
+        this(conf, null, null, null, NullStatsLogger.INSTANCE,
                 null, null, null);
     }
 
@@ -347,7 +365,7 @@ private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGro
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
             throws IOException, InterruptedException, BKException {
-        this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null);
+        this(conf, validateZooKeeper(zk), null, null, NullStatsLogger.INSTANCE, null, null, null);
     }
 
     /**
@@ -369,17 +387,19 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
             throws IOException, InterruptedException, BKException {
-        this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
+        this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), null, NullStatsLogger.INSTANCE,
                 null, null, null);
     }
 
     /**
      * Constructor for use with the builder. Other constructors also use it.
      */
+    @SuppressWarnings("deprecation")
     @VisibleForTesting
     BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        EventLoopGroup eventLoopGroup,
+                       ByteBufAllocator byteBufAllocator,
                        StatsLogger rootStatsLogger,
                        DNSToSwitchMapping dnsResolver,
                        HashedWheelTimer requestTimer,
@@ -443,8 +463,19 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
             this.ownEventLoopGroup = false;
         }
 
+        if (byteBufAllocator != null) {
+            this.allocator = byteBufAllocator;
+        } else {
+            this.allocator = ByteBufAllocatorBuilder.create()
+                    .poolingPolicy(conf.getAllocatorPoolingPolicy())
+                    .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+                    .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+                    .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                    .build();
+        }
+
         // initialize bookie client
-        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool,
                 scheduler, rootStatsLogger);
 
         if (null == requestTimer) {
@@ -517,6 +548,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
         bookieWatcher = null;
         bookieInfoScheduler = null;
         bookieClient = null;
+        allocator = UnpooledByteBufAllocator.DEFAULT;
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
@@ -1466,6 +1498,11 @@ public BookKeeperClientStats getClientStats() {
             public boolean isClientClosed() {
                 return BookKeeper.this.isClosed();
             }
+
+            @Override
+            public ByteBufAllocator getByteBufAllocator() {
+                return allocator;
+            }
         };
 
     ClientContext getClientCtx() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
index d8803d0ea6..da3abde2a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBufAllocator;
+
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -37,6 +39,7 @@
     BookieWatcher getBookieWatcher();
     EnsemblePlacementPolicy getPlacementPolicy();
     BookieClient getBookieClient();
+    ByteBufAllocator getByteBufAllocator();
     OrderedExecutor getMainWorkerPool();
     OrderedScheduler getScheduler();
     BookKeeperClientStats getClientStats();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 157a1b84ca..bd8ec68314 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -188,7 +188,7 @@
         }
 
         macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
-                                               clientCtx.getConf().useV2WireProtocol);
+                                               clientCtx.getByteBufAllocator(), clientCtx.getConf().useV2WireProtocol);
 
         // If the password is empty, pass the same random ledger key which is generated by the hash of the empty
         // password, so that the bookie can avoid processing the keys for each entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
index 0e147dd301..ea30dc5177 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
@@ -20,9 +20,11 @@
  */
 package org.apache.bookkeeper.client.api;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
+
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -47,6 +49,15 @@
      */
     BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
 
+    /**
+     * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+     *
+     * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+     * @return client builder.
+     * @since 4.9
+     */
+    BookKeeperBuilder allocator(ByteBufAllocator allocator);
+
     /**
      * Configure the bookkeeper client with a provided {@link StatsLogger}.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
index 3a07d1bc55..6373ace3ed 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client.impl;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
@@ -50,6 +51,12 @@ public BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) {
         return this;
     }
 
+    @Override
+    public BookKeeperBuilder allocator(ByteBufAllocator allocator) {
+        builder.allocator(allocator);
+        return this;
+    }
+
     @Override
     public BookKeeperBuilder statsLogger(StatsLogger statsLogger) {
         builder.statsLogger(statsLogger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 65d702b3b5..36cfa637e9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -28,6 +28,10 @@
 import javax.net.ssl.SSLEngine;
 
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -155,6 +159,12 @@
     // enforce minimum number of racks per write quorum
     public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
 
+    // Allocator configuration
+    protected static final String ALLOCATOR_POOLING_POLICY = "allocatorPoolingPolicy";
+    protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency";
+    protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy";
+    protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy";
+
     // option to limit stats logging
     public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
 
@@ -881,6 +891,95 @@ public T setPreserveMdcForTaskExecution(boolean enabled) {
         return getThis();
     }
 
+    /**
+     * @return the configured pooling policy for the allocator.
+     */
+    public PoolingPolicy getAllocatorPoolingPolicy() {
+        return PoolingPolicy.valueOf(this.getString(ALLOCATOR_POOLING_POLICY, PoolingPolicy.PooledDirect.toString()));
+    }
+
+    /**
+     * Define the memory pooling policy.
+     *
+     * <p>Default is {@link PoolingPolicy#PooledDirect}
+     *
+     * @param poolingPolicy
+     *            the memory pooling policy
+     * @return configuration object.
+     */
+    public T setAllocatorPoolingPolicy(PoolingPolicy poolingPolicy) {
+        this.setProperty(ALLOCATOR_POOLING_POLICY, poolingPolicy.toString());
+        return getThis();
+    }
+
+    /**
+     * @return the configured pooling concurrency for the allocator.
+     */
+    public int getAllocatorPoolingConcurrency() {
+        return this.getInteger(ALLOCATOR_POOLING_CONCURRENCY, 2 * Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
+     * Controls the amount of concurrency for the memory pool.
+     *
+     * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+     *
+     * <p>Decreasing this number will reduce the amount of memory overhead, at the
+     * expense of increased allocation contention.
+     *
+     * @param concurrency
+     *            the concurrency level to use for the allocator pool
+     * @return configuration object.
+     */
+    public T setAllocatorPoolingConcurrenncy(int concurrency) {
+        this.setProperty(ALLOCATOR_POOLING_POLICY, concurrency);
+        return getThis();
+    }
+
+    /**
+     * @return the configured ouf of memory policy for the allocator.
+     */
+    public OutOfMemoryPolicy getAllocatorOutOfMemoryPolicy() {
+        return OutOfMemoryPolicy
+                .valueOf(this.getString(ALLOCATOR_OOM_POLICY, OutOfMemoryPolicy.FallbackToHeap.toString()));
+    }
+
+    /**
+     * Define the memory allocator out of memory policy.
+     *
+     * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+     *
+     * @param oomPolicy
+     *            the "out-of-memory" policy for the memory allocator
+     * @return configuration object.
+     */
+    public T setAllocatorOutOfMemoryPolicy(OutOfMemoryPolicy oomPolicy) {
+        this.setProperty(ALLOCATOR_OOM_POLICY, oomPolicy.toString());
+        return getThis();
+    }
+
+    /**
+     * Return the configured leak detection policy for the allocator.
+     */
+    public LeakDetectionPolicy getAllocatorLeakDetectionPolicy() {
+        return LeakDetectionPolicy
+                .valueOf(this.getString(ALLOCATOR_LEAK_DETECTION_POLICY, LeakDetectionPolicy.Disabled.toString()));
+    }
+
+    /**
+     * Enable the leak detection for the allocator.
+     *
+     * <p>Default is {@link LeakDetectionPolicy#Disabled}
+     *
+     * @param leakDetectionPolicy
+     *            the leak detection policy for the memory allocator
+     * @return configuration object.
+     */
+    public T setAllocatorLeakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy) {
+        this.setProperty(ALLOCATOR_LEAK_DETECTION_POLICY, leakDetectionPolicy.toString());
+        return getThis();
+    }
+
     /**
      * Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7d0d319882..3b390d48e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -21,11 +21,14 @@
 import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
 
 import io.netty.buffer.ByteBuf;
+
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.BookKeeperBuilder;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -1772,8 +1775,11 @@ public ClientConfiguration setMaxAllowedEnsembleChanges(int num) {
     /**
      * Option to use Netty Pooled ByteBufs.
      *
+     * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+     *
      * @return the value of the option
      */
+    @Deprecated
     public boolean isNettyUsePooledBuffers() {
         return getBoolean(NETTY_USE_POOLED_BUFFERS, true);
     }
@@ -1785,6 +1791,8 @@ public boolean isNettyUsePooledBuffers() {
      * @param enabled
      *          if enabled BookKeeper will use default Pooled Netty Buffer allocator
      *
+     * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+     *
      * @see #setUseV2WireProtocol(boolean)
      * @see ByteBuf#release()
      * @see LedgerHandle#readEntries(long, long)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 18f48a2ab8..83428214a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -27,6 +27,7 @@
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -76,11 +77,12 @@
 public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
     static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
 
-    OrderedExecutor executor;
-    ScheduledExecutorService scheduler;
-    ScheduledFuture<?> timeoutFuture;
+    private final OrderedExecutor executor;
+    private final ScheduledExecutorService scheduler;
+    private final ScheduledFuture<?> timeoutFuture;
 
-    EventLoopGroup eventLoopGroup;
+    private final EventLoopGroup eventLoopGroup;
+    private final ByteBufAllocator allocator;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
             new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
 
@@ -96,10 +98,12 @@
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+                            ByteBufAllocator allocator,
                             OrderedExecutor executor, ScheduledExecutorService scheduler,
                             StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
+        this.allocator = allocator;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
@@ -120,6 +124,8 @@ public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
                                                                     conf.getTimeoutMonitorIntervalSec(),
                                                                     conf.getTimeoutMonitorIntervalSec(),
                                                                     TimeUnit.SECONDS);
+        } else {
+            this.timeoutFuture = null;
         }
     }
 
@@ -175,7 +181,7 @@ public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBook
         if (conf.getLimitStatsLogging()) {
             statsLoggerForPCBC = NullStatsLogger.INSTANCE;
         }
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLoggerForPCBC,
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, allocator, address, statsLoggerForPCBC,
                 authProviderFactory, registry, pcbcPool, shFactory);
     }
 
@@ -596,8 +602,8 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a
                 .build();
         ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
-        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                                   scheduler, NullStatsLogger.INSTANCE);
+        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                null, executor, scheduler, NullStatsLogger.INSTANCE);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index add453728b..5b5c288123 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -24,6 +24,7 @@
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
@@ -107,8 +108,11 @@
     final BookieAuthProvider.Factory authProviderFactory;
     final ExtensionRegistry registry = ExtensionRegistry.newInstance();
 
-    BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
+    private final ByteBufAllocator allocator;
+
+    BookieNettyServer(ServerConfiguration conf, RequestProcessor processor, ByteBufAllocator allocator)
         throws IOException, KeeperException, InterruptedException, BookieException {
+        this.allocator = allocator;
         this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
         this.conf = conf;
         this.requestProcessor = processor;
@@ -296,7 +300,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
     private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
         if (!conf.isDisableServerSocketBind()) {
             ServerBootstrap bootstrap = new ServerBootstrap();
-            bootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
+            bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+            bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
             bootstrap.group(eventLoopGroup, eventLoopGroup);
             bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
             bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index b883f74a21..7d5e2e613e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -29,6 +29,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
@@ -44,6 +45,7 @@
 
 import lombok.AccessLevel;
 import lombok.Getter;
+
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -124,9 +126,12 @@
     final Optional<Cache<Channel, Boolean>> blacklistedChannels;
     final Consumer<Channel> onResponseTimeout;
 
-    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
-            StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
+    private final ByteBufAllocator allocator;
+
+    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
+            SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
         this.serverCfg = serverCfg;
+        this.allocator = allocator;
         this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
         this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
         this.bookie = bookie;
@@ -158,7 +163,7 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
                 OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.shFactory = shFactory;
         if (shFactory != null) {
-            shFactory.init(NodeType.Server, serverCfg);
+            shFactory.init(NodeType.Server, serverCfg, allocator);
         }
 
         this.requestTimer = new HashedWheelTimer(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 27e8ab2eef..b8c1adcef0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -25,6 +25,8 @@
 import static org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP_USERS;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.UnknownHostException;
@@ -38,6 +40,7 @@
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.bookie.ReadOnlyBookie;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,7 +65,7 @@
     final ServerConfiguration conf;
     BookieNettyServer nettyServer;
     private volatile boolean running = false;
-    Bookie bookie;
+    private final Bookie bookie;
     DeathWatcher deathWatcher;
     private static final Logger LOG = LoggerFactory.getLogger(BookieServer.class);
 
@@ -96,10 +99,11 @@ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
             LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
         }
 
+        ByteBufAllocator allocator = getAllocator(conf);
         this.statsLogger = statsLogger;
-        this.nettyServer = new BookieNettyServer(this.conf, null);
+        this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
         try {
-            this.bookie = newBookie(conf);
+            this.bookie = newBookie(conf, allocator);
         } catch (IOException | KeeperException | InterruptedException | BookieException e) {
             // interrupted on constructing a bookie
             this.nettyServer.shutdown();
@@ -110,7 +114,7 @@ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
         shFactory = SecurityProviderFactoryFactory
                 .getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
         this.requestProcessor = new BookieRequestProcessor(conf, bookie,
-                statsLogger.scope(SERVER_SCOPE), shFactory);
+                statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
         this.nettyServer.setRequestProcessor(this.requestProcessor);
     }
 
@@ -126,11 +130,11 @@ public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler) {
         this.uncaughtExceptionHandler = exceptionHandler;
     }
 
-    protected Bookie newBookie(ServerConfiguration conf)
+    protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
         throws IOException, KeeperException, InterruptedException, BookieException {
         return conf.isForceReadOnlyBookie()
-            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE))
-            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
+            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator)
+            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator);
     }
 
     public void start() throws IOException, UnavailableException, InterruptedException, BKException {
@@ -285,6 +289,24 @@ public void run() {
         }
     }
 
+    private ByteBufAllocator getAllocator(ServerConfiguration conf) {
+        return ByteBufAllocatorBuilder.create()
+                .poolingPolicy(conf.getAllocatorPoolingPolicy())
+                .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+                .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+                .outOfMemoryListener((ex) -> {
+                    try {
+                        LOG.error("Unable to allocate memory, exiting bookie", ex);
+                    } finally {
+                        if (uncaughtExceptionHandler != null) {
+                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), ex);
+                        }
+                    }
+                })
+                .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                .build();
+    }
+
     /**
      * Legacy Method to run bookie server.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6839afc8f2..d411e9880d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -29,7 +29,6 @@
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
@@ -166,6 +165,7 @@
 
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
+    final ByteBufAllocator allocator;
     final OrderedExecutor executor;
     final long addEntryTimeoutNanos;
     final long readEntryTimeoutNanos;
@@ -345,12 +345,14 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) throws SecurityException {
-       this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
+        this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, addr, NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
-                                  EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
+                                  EventLoopGroup eventLoopGroup,
+                                  ByteBufAllocator allocator,
+                                  BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool,
@@ -364,6 +366,7 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
         } else {
             this.eventLoopGroup = eventLoopGroup;
         }
+        this.allocator = allocator;
         this.state = ConnectionState.DISCONNECTED;
         this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
         this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
@@ -376,7 +379,7 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor
         this.extRegistry = extRegistry;
         this.shFactory = shFactory;
         if (shFactory != null) {
-            shFactory.init(NodeType.Client, conf);
+            shFactory.init(NodeType.Client, conf, allocator);
         }
 
         StringBuilder nameBuilder = new StringBuilder();
@@ -515,14 +518,7 @@ protected ChannelFuture connect() {
             bootstrap.channel(NioSocketChannel.class);
         }
 
-        ByteBufAllocator allocator;
-        if (this.conf.isNettyUsePooledBuffers()) {
-            allocator = PooledByteBufAllocator.DEFAULT;
-        } else {
-            allocator = UnpooledByteBufAllocator.DEFAULT;
-        }
-
-        bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+        bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis());
         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                 conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index d6d1949b47..59dec3a235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -22,6 +22,7 @@
 import com.scurrilous.circe.crc.Sse42Crc32C;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,10 +39,10 @@ protected MutableInt initialValue() throws Exception {
         }
     };
 
-    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
         if (!Sse42Crc32C.isSupported()) {
-            log.error("Sse42Crc32C is not supported, will use less slower CRC32C implementation.");
+            log.error("Sse42Crc32C is not supported, will use a slower CRC32C implementation.");
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index b71ab59626..d06bc8030c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,6 +19,7 @@
 */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 
 /**
@@ -46,8 +47,8 @@ protected CRC32Digest initialValue() {
         }
     };
 
-    public CRC32DigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public CRC32DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 1928637454..2dabf82821 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
 
@@ -47,6 +47,7 @@
 
     final long ledgerId;
     final boolean useV2Protocol;
+    private final ByteBufAllocator allocator;
 
     abstract int getMacCodeLength();
 
@@ -60,28 +61,24 @@ void update(byte[] data) {
 
     final int macCodeLength;
 
-    public DigestManager(long ledgerId, boolean useV2Protocol) {
+    public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
         this.ledgerId = ledgerId;
         this.useV2Protocol = useV2Protocol;
-        macCodeLength = getMacCodeLength();
-    }
-
-    public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType)
-            throws GeneralSecurityException {
-        return instantiate(ledgerId, passwd, digestType, false);
+        this.macCodeLength = getMacCodeLength();
+        this.allocator = allocator;
     }
 
     public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
-            boolean useV2Protocol) throws GeneralSecurityException {
+            ByteBufAllocator allocator, boolean useV2Protocol) throws GeneralSecurityException {
         switch(digestType) {
         case HMAC:
-            return new MacDigestManager(ledgerId, passwd, useV2Protocol);
+            return new MacDigestManager(ledgerId, passwd, useV2Protocol, allocator);
         case CRC32:
-            return new CRC32DigestManager(ledgerId, useV2Protocol);
+            return new CRC32DigestManager(ledgerId, useV2Protocol, allocator);
         case CRC32C:
-            return new CRC32CDigestManager(ledgerId, useV2Protocol);
+            return new CRC32CDigestManager(ledgerId, useV2Protocol, allocator);
         case DUMMY:
-            return new DummyDigestManager(ledgerId, useV2Protocol);
+            return new DummyDigestManager(ledgerId, useV2Protocol, allocator);
         default:
             throw new GeneralSecurityException("Unknown checksum type: " + digestType);
         }
@@ -106,7 +103,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
             /*
              * For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
              */
-            ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+            ByteBuf headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
             headersBuffer.writeLong(ledgerId);
             headersBuffer.writeLong(entryId);
             headersBuffer.writeLong(lastAddConfirmed);
@@ -149,7 +146,7 @@ public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddC
     public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
         ByteBuf headersBuffer;
         if (this.useV2Protocol) {
-            headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+            headersBuffer = allocator.buffer(LAC_METADATA_LENGTH + macCodeLength);
         } else {
             headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
         }
@@ -185,7 +182,7 @@ private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryI
         int offset = METADATA_LENGTH + macCodeLength;
         update(dataReceived.slice(offset, dataReceived.readableBytes() - offset));
 
-        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        ByteBuf digest = allocator.buffer(macCodeLength);
         populateValueAndReset(digest);
 
         try {
@@ -225,7 +222,7 @@ public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchE
 
         update(dataReceived.slice(0, LAC_METADATA_LENGTH));
 
-        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        ByteBuf digest = allocator.buffer(macCodeLength);
         try {
             populateValueAndReset(digest);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index 1b771f0785..aeb0d5b21a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -21,14 +21,15 @@
 package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 
 /**
  * This class provides a noop digest implementation.
  */
 public class DummyDigestManager extends DigestManager {
-    public DummyDigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public DummyDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index e71c077eab..92b93f1450 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Charsets.UTF_8;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.security.MessageDigest;
@@ -71,9 +72,9 @@ protected Mac initialValue() {
         }
     };
 
-    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol)
+    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol, ByteBufAllocator allocator)
             throws GeneralSecurityException {
-        super(ledgerId, useV2Protocol);
+        super(ledgerId, useV2Protocol, allocator);
         this.passwd = Arrays.copyOf(passwd, passwd.length);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
index 59be8847dd..5b43744fd1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.tls;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.ssl.SslHandler;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -37,7 +38,7 @@
 
     String getHandlerName();
 
-    void init(NodeType type, AbstractConfiguration conf) throws SecurityException;
+    void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator) throws SecurityException;
 
     SslHandler newTLSHandler();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index 17aea85faf..32e2426404 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -20,7 +20,8 @@
 import com.google.common.base.Strings;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.netty.buffer.PooledByteBufAllocator;
+
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.OpenSsl;
 import io.netty.handler.ssl.SslContext;
@@ -79,6 +80,7 @@ public String toString() {
     private String[] protocols;
     private String[] ciphers;
     private SslContext sslContext;
+    private ByteBufAllocator allocator;
 
     private String getPasswordFromFile(String path) throws IOException {
         byte[] pwd;
@@ -350,7 +352,9 @@ private void createServerContext(AbstractConfiguration conf) throws SecurityExce
     }
 
     @Override
-    public synchronized void init(NodeType type, AbstractConfiguration conf) throws SecurityException {
+    public synchronized void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator)
+            throws SecurityException {
+        this.allocator = allocator;
         final String enabledProtocols;
         final String enabledCiphers;
 
@@ -397,7 +401,7 @@ public synchronized void init(NodeType type, AbstractConfiguration conf) throws
 
     @Override
     public SslHandler newTLSHandler() {
-        SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
+        SslHandler sslHandler = sslContext.newHandler(allocator);
 
         if (protocols != null && protocols.length != 0) {
             sslHandler.engine().setEnabledProtocols(protocols);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index f9e4cffce0..355cf3f307 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -23,7 +23,6 @@
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
@@ -307,7 +306,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                     if (prependSize) {
                         // Prepend the frame size before writing the buffer list, so that we only have 1 single size
                         // header
-                        ByteBuf sizeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+                        ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
                         sizeBuffer.writeInt(b.readableBytes());
                         ctx.write(sizeBuffer, ctx.voidPromise());
                     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 530969bf68..7dde6fdf0a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -34,6 +34,8 @@
 import static org.mockito.Mockito.mock;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -153,7 +155,8 @@ public void testExitCodeZK_REG_FAIL() throws Exception {
 
         // simulating ZooKeeper exception by assigning a closed zk client to bk
         BookieServer bkServer = new BookieServer(conf) {
-            protected Bookie newBookie(ServerConfiguration conf)
+            @Override
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
                     throws IOException, KeeperException, InterruptedException,
                     BookieException {
                 Bookie bookie = new Bookie(conf);
@@ -708,7 +711,7 @@ public MockBookieServer(ServerConfiguration conf) throws IOException, KeeperExce
         }
 
         @Override
-        protected Bookie newBookie(ServerConfiguration conf)
+        protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
                 throws IOException, KeeperException, InterruptedException, BookieException {
             return new MockBookieWithNoopShutdown(conf, NullStatsLogger.INSTANCE);
         }
@@ -717,7 +720,7 @@ protected Bookie newBookie(ServerConfiguration conf)
     class MockBookieWithNoopShutdown extends Bookie {
         public MockBookieWithNoopShutdown(ServerConfiguration conf, StatsLogger statsLogger)
                 throws IOException, KeeperException, InterruptedException, BookieException {
-            super(conf, statsLogger);
+            super(conf, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         // making Bookie Shutdown no-op. Ideally for this testcase we need to
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
index 86f3a8643f..c98663dd63 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -23,6 +23,8 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
@@ -76,8 +78,8 @@ public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersis
         newLogFile.deleteOnExit();
         FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
 
-        BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
-                INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+        BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel,
+                INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
 
         ByteBuf dataBuf = generateEntry(byteBufLength);
         dataBuf.markReaderIndex();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index bbe9c10edc..556556e348 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -37,6 +37,7 @@
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -265,7 +266,8 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact)
                     null,
                     cp,
                     Checkpointer.NULL,
-                    NullStatsLogger.INSTANCE);
+                    NullStatsLogger.INSTANCE,
+                    UnpooledByteBufAllocator.DEFAULT);
                 storage.start();
                 long startTime = System.currentTimeMillis();
                 storage.gcThread.enableForceGC();
@@ -616,7 +618,8 @@ public void testCompactionPersistence() throws Exception {
         Bookie newbookie = new Bookie(newBookieConf);
 
         DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                BookKeeper.DigestType.toProtoDigestType(digestType), baseClientConf.getUseV2WireProtocol());
+                BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                baseClientConf.getUseV2WireProtocol());
 
         for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) {
             ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId);
@@ -860,7 +863,8 @@ public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean c
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         ledgers.add(1L);
         ledgers.add(2L);
         ledgers.add(3L);
@@ -885,7 +889,8 @@ public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean c
             dirs, dirs, null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         storage.start();
         for (int i = 0; i < 10; i++) {
             if (!log0.exists()) {
@@ -910,7 +915,8 @@ public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean c
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         storage.getEntry(1, 1); // entry should exist
     }
 
@@ -1021,7 +1027,8 @@ public void checkpointComplete(Checkpoint checkpoint,
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
 
         double threshold = 0.1;
         // shouldn't throw exception
@@ -1063,7 +1070,7 @@ public void checkpointComplete(Checkpoint checkpoint,
         };
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
         storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
-            Checkpointer.NULL, NullStatsLogger.INSTANCE);
+            Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         for (long ledger = 0; ledger <= 10; ledger++) {
             ledgers.add(ledger);
@@ -1081,7 +1088,7 @@ public void checkpointComplete(Checkpoint checkpoint,
 
         storage = new InterleavedLedgerStorage();
         storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
-                           Checkpointer.NULL, NullStatsLogger.INSTANCE);
+                           Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId()
             - storage.gcThread.scannedLogId;
@@ -1158,7 +1165,8 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact)
             null,
             cp,
             Checkpointer.NULL,
-            stats.getStatsLogger("storage"));
+            stats.getStatsLogger("storage"),
+            UnpooledByteBufAllocator.DEFAULT);
         storage.start();
 
         int majorCompactions = stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get().intValue();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index f5d4edce96..0b24db2304 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -22,6 +22,8 @@
 
 import com.google.common.util.concurrent.MoreExecutors;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -606,7 +608,8 @@ public void testEntryLogManagerMetrics() throws Exception {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
@@ -731,7 +734,8 @@ public void testEntryLogManagerMetricsFromExpiryAspect() throws Exception {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 9062261c1f..e1f35820da 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -30,6 +30,8 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -963,8 +965,8 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception
         File tmpFile = File.createTempFile("entrylog", logid + "");
         tmpFile.deleteOnExit();
         FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel();
-        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(fc, 10, 10, logid, tmpFile,
-                servConf.getFlushIntervalInBytes());
+        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10,
+                logid, tmpFile, servConf.getFlushIntervalInBytes());
         return logChannel;
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 70f2a0ecb0..909e646b2c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -29,6 +29,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -342,7 +343,8 @@ void validateFileInfo(IndexPersistenceMgr indexPersistenceMgr, long ledgerId, in
 
         preCreateFileInfoForLedger(ledgerId, headerVersion);
         DigestManager digestManager = DigestManager.instantiate(ledgerId, masterKey,
-                BookKeeper.DigestType.toProtoDigestType(digestType), getUseV2WireProtocol);
+                BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                getUseV2WireProtocol);
 
         CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey);
         fileInfo.readHeader();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index 4fde8e74a6..c12ed9126e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -26,6 +26,8 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -110,7 +112,7 @@ public TestableEntryLogger(
                 LedgerDirsManager ledgerDirsManager,
                 EntryLogListener listener,
                 StatsLogger statsLogger) throws IOException {
-            super(conf, ledgerDirsManager, listener, statsLogger);
+            super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 232173507b..a606f9bd75 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -28,6 +28,7 @@
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
@@ -522,7 +523,8 @@ public void initialize(ServerConfiguration conf,
                                StateManager stateManager,
                                CheckpointSource checkpointSource,
                                Checkpointer checkpointer,
-                               StatsLogger statsLogger) throws IOException {
+                               StatsLogger statsLogger,
+                               ByteBufAllocator allocator) throws IOException {
             super.initialize(
                 conf,
                 ledgerManager,
@@ -531,7 +533,8 @@ public void initialize(ServerConfiguration conf,
                 stateManager,
                 checkpointSource,
                 checkpointer,
-                statsLogger);
+                statsLogger,
+                allocator);
             if (this.memTable instanceof EntryMemTableWithParallelFlusher) {
                 this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger) {
                     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index 697d7c0e88..d028f70a64 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -23,6 +23,8 @@
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -145,7 +147,8 @@ public void testExplicitLacWriteToJournal(int journalFormatVersionToWrite, int f
 
         if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
             DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                    BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+                    BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                    confWithExplicitLAC.getUseV2WireProtocol());
             long explicitLacPersistedInJournal = digestManager.verifyDigestAndReturnLac(explicitLacBuf);
             assertEquals("explicitLac persisted in journal", (numOfEntries - 1), explicitLacPersistedInJournal);
         } else {
@@ -226,7 +229,8 @@ public void testExplicitLacWriteToFileInfo(int journalFormatVersionToWrite, int
 
         if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
             DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                    BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+                    BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                    confWithExplicitLAC.getUseV2WireProtocol());
             long explicitLacReadFromFileInfo = digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
             assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 1), explicitLacReadFromFileInfo);
         } else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
index 645af9c498..0b429ad090 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -22,6 +22,9 @@
  */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,7 +51,7 @@
 
         public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener,
                 StatsLogger statsLogger) throws IOException {
-            super(conf, ledgerDirsManager, listener, statsLogger);
+            super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         public SlowEntryLogger setAddDelay(long delay) {
@@ -110,10 +113,11 @@ public void initialize(ServerConfiguration conf,
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
         super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                stateManager, checkpointSource, checkpointer, statsLogger);
+                stateManager, checkpointSource, checkpointer, statsLogger, allocator);
         // do not want to add these to config class, reading throw "raw" interface
         long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
         long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 44f20e6f54..48d1038035 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -26,6 +26,8 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -141,7 +143,8 @@ public void start() {
             null,
             checkpointSrc,
             checkpointer,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
     }
 
     @After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
index 8da2ffaea9..71658e7ab7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -22,6 +22,9 @@
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -152,7 +155,8 @@ public void testReadOnlyBookieTransitions() throws Exception{
                 .setJournalDirName(tmpDir.toString())
                 .setMetadataServiceUri(zkUtil.getMetadataServiceUri())
                 .setForceReadOnlyBookie(true);
-        ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE);
+        ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
         readOnlyBookie.start();
         assertTrue(readOnlyBookie.isRunning());
         assertTrue(readOnlyBookie.isReadOnly());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index d9fa8cc67e..707eb817ee 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,6 +26,8 @@
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -273,7 +275,8 @@ public void initialize(
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger)
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator)
                 throws IOException {
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index bfd7a4d712..5c2463369a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -25,6 +25,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -94,7 +95,7 @@ public void convertFromDbStorageToInterleaved() throws Exception {
 
         DbLedgerStorage dbStorage = new DbLedgerStorage();
         dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the dbStorage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -124,7 +125,7 @@ public void convertFromDbStorageToInterleaved() throws Exception {
         // Verify that interleaved storage index has the same entries
         InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index b2afe4c6d7..780b8ec6fe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -25,6 +25,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -91,7 +92,7 @@ public void test() throws Exception {
 
         InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the interleaved storage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -121,11 +122,12 @@ public void test() throws Exception {
         // Verify that db index has the same entries
         DbLedgerStorage dbStorage = new DbLedgerStorage();
         dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager,
-                ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 7a45ee764a..e5feef3048 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -24,6 +24,7 @@
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
@@ -58,10 +59,11 @@
         protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
                 LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
                 StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
-                StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+                StatsLogger statsLogger, ScheduledExecutorService gcExecutor,
+                long writeCacheSize, long readCacheSize)
                 throws IOException {
             return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                    stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize,
+                    stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
                     readCacheSize);
         }
 
@@ -69,9 +71,10 @@ protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(Serve
             public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                     LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
                     CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
-                    ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+                    ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize,
+                    long readCacheSize) throws IOException {
                 super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, stateManager, checkpointSource,
-                        checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+                        checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize);
             }
 
           @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 629a238f28..7bdbcd518b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -27,6 +27,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -92,7 +93,7 @@ public void test() throws Exception {
 
         DbLedgerStorage ledgerStorage = new DbLedgerStorage();
         ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the storage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -122,7 +123,7 @@ public void test() throws Exception {
         // Verify that db index has the same entries
         ledgerStorage = new DbLedgerStorage();
         ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
index 42e509963d..337140c2dd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
@@ -25,6 +25,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import org.junit.Test;
 
@@ -35,7 +36,7 @@
 
     @Test
     public void simple() {
-        ReadCache cache = new ReadCache(10 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -72,7 +73,7 @@ public void simple() {
 
     @Test
     public void emptyCache() {
-        ReadCache cache = new ReadCache(10 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -84,7 +85,7 @@ public void emptyCache() {
     @Test
     public void multipleSegments() {
         // Test with multiple smaller segments
-        ReadCache cache = new ReadCache(10 * 1024, 2 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024, 2 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
index f8b2bba446..5726bbbf8e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -26,9 +26,10 @@
 import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.nio.charset.Charset;
 import java.util.concurrent.BrokenBarrierException;
@@ -46,11 +47,13 @@
  */
 public class WriteCacheTest {
 
+    private static final ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+
     @Test
     public void simple() throws Exception {
-        WriteCache cache = new WriteCache(10 * 1024);
+        WriteCache cache = new WriteCache(allocator, 10 * 1024);
 
-        ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+        ByteBuf entry1 = allocator.buffer(1024);
         ByteBufUtil.writeUtf8(entry1, "entry-1");
         entry1.writerIndex(entry1.capacity());
 
@@ -87,9 +90,9 @@ public void cacheFull() throws Exception {
         int entrySize = 1024;
         int entriesCount = cacheSize / entrySize;
 
-        WriteCache cache = new WriteCache(cacheSize);
+        WriteCache cache = new WriteCache(allocator, cacheSize);
 
-        ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+        ByteBuf entry = allocator.buffer(entrySize);
         entry.writerIndex(entry.capacity());
 
         for (int i = 0; i < entriesCount; i++) {
@@ -125,7 +128,7 @@ public void cacheFull() throws Exception {
     @Test
     public void testMultipleSegments() {
         // Create cache with max size 1Mb and each segment is 16Kb
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writerIndex(entry.capacity());
@@ -142,7 +145,7 @@ public void testMultipleSegments() {
 
     @Test
     public void testEmptyCache() {
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -160,7 +163,7 @@ public void testEmptyCache() {
     @Test
     public void testMultipleWriters() throws Exception {
         // Create cache with max size 1Mb and each segment is 16Kb
-        WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 10 * 1024 * 1024, 16 * 1024);
 
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -220,7 +223,7 @@ public void testMultipleWriters() throws Exception {
 
     @Test
     public void testLedgerDeletion() {
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writerIndex(entry.capacity());
@@ -265,7 +268,7 @@ public void testLedgerDeletion() {
     @Test
     public void testWriteReadsInMultipleSegments() {
         // Create cache with max size 4 KB and each segment is 128 bytes
-        WriteCache cache = new WriteCache(4 * 1024, 128);
+        WriteCache cache = new WriteCache(allocator, 4 * 1024, 128);
 
         for (int i = 0; i < 48; i++) {
             boolean inserted = cache.put(1, i, Unpooled.wrappedBuffer(("test-" + i).getBytes()));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 7be404d0d1..cb45ba77e8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -587,6 +587,7 @@ public void testReadWriteWithV2WireProtocol() throws Exception {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testReadEntryReleaseByteBufs() throws Exception {
         ClientConfiguration confWriter = new ClientConfiguration();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index 253f0d9091..e1d32af463 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -45,7 +47,7 @@
 
     public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
             throws IOException, InterruptedException, BKException {
-        super(conf, null, null,
+        super(conf, null, null, new UnpooledByteBufAllocator(false),
               statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
               null, null, null);
         this.statsProvider = statsProvider;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 68a68463bb..a6b873ed08 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -21,6 +21,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.util.function.Function;
@@ -47,7 +48,8 @@ public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddCo
 
     public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data,
             int offset, int len) throws GeneralSecurityException {
-        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32);
+        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32,
+                UnpooledByteBufAllocator.DEFAULT, true);
         return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
                 Unpooled.wrappedBuffer(data, offset, len)));
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6d6e4d15a2..69b5635224 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -31,7 +31,9 @@
 import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -210,6 +212,11 @@ public BookKeeperClientStats getClientStats() {
                 public boolean isClientClosed() {
                     return bk.isClosed();
                 }
+
+                @Override
+                public ByteBufAllocator getByteBufAllocator() {
+                    return UnpooledByteBufAllocator.DEFAULT;
+                }
             };
         when(bk.getClientCtx()).thenReturn(clientCtx);
         when(bk.getLedgerManager()).thenReturn(ledgerManager);
@@ -241,7 +248,8 @@ private DigestManager getDigestType(long ledgerId) throws GeneralSecurityExcepti
                 metadata.getPassword(),
                 org.apache.bookkeeper.client.BookKeeper.DigestType.toProtoDigestType(
                         org.apache.bookkeeper.client.BookKeeper.DigestType.fromApiDigestType(
-                                metadata.getDigestType())));
+                                metadata.getDigestType())),
+                UnpooledByteBufAllocator.DEFAULT, false);
     }
 
     @After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index f36c00885b..f0be8d02a3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -22,6 +22,9 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.util.function.BooleanSupplier;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -50,6 +53,7 @@
     private BookKeeperClientStats clientStats;
     private BooleanSupplier isClientClosed;
     private MockRegistrationClient regClient;
+    private ByteBufAllocator allocator;
 
     static MockClientContext create() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
@@ -67,6 +71,7 @@ static MockClientContext create() throws Exception {
             .setPlacementPolicy(placementPolicy)
             .setRegistrationClient(regClient)
             .setBookieClient(new MockBookieClient(scheduler))
+            .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
             .setMainWorkerPool(scheduler)
             .setScheduler(scheduler)
             .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
@@ -83,6 +88,7 @@ static MockClientContext copyOf(ClientContext other) {
             .setMainWorkerPool(other.getMainWorkerPool())
             .setScheduler(other.getScheduler())
             .setClientStats(other.getClientStats())
+            .setByteBufAllocator(other.getByteBufAllocator())
             .setIsClientClosed(other::isClientClosed);
     }
 
@@ -151,6 +157,11 @@ public MockClientContext setRegistrationClient(MockRegistrationClient regClient)
         return this;
     }
 
+    public MockClientContext setByteBufAllocator(ByteBufAllocator allocator) {
+        this.allocator = allocator;
+        return this;
+    }
+
     private static <T> T maybeSpy(T orig) {
         if (Mockito.mockingDetails(orig).isSpy()) {
             return orig;
@@ -204,4 +215,8 @@ public boolean isClientClosed() {
         return isClientClosed.getAsBoolean();
     }
 
+    @Override
+    public ByteBufAllocator getByteBufAllocator() {
+        return allocator;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index a1032a16e0..e1c203d2b9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -32,6 +32,8 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -132,7 +134,7 @@ public void setup() throws Exception {
         when(mockLh.getCurrentEnsemble()).thenReturn(ensemble);
         when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
         when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
-        digestManager = new DummyDigestManager(LEDGERID, false);
+        digestManager = new DummyDigestManager(LEDGERID, false, UnpooledByteBufAllocator.DEFAULT);
         when(mockLh.getDigestManager()).thenReturn(digestManager);
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index 4ec5992a39..39c615a63a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -23,6 +23,7 @@
 
 import static org.junit.Assert.assertTrue;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -108,7 +109,8 @@ public void testGetBookieInfoTimeout() throws Exception {
         // try to get bookie info from the sleeping bookie. It should fail with timeout error
         BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, executor,
+                scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
index 9073b01ccc..f993e20642 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
@@ -27,6 +27,7 @@
 import java.util.Enumeration;
 
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public static ServerConfiguration newServerConfiguration() {
         confReturn.setGcWaitTime(1000);
         confReturn.setDiskUsageThreshold(0.999f);
         confReturn.setDiskUsageWarnThreshold(0.99f);
+        confReturn.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         confReturn.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
         confReturn.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
         setLoopbackInterfaceAndAllowLoopback(confReturn);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 828956d4f3..31bd406d48 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -30,6 +30,8 @@
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,6 +50,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -566,7 +569,8 @@ public void initialize(
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger) throws IOException {
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 3aa2b8ac56..cf9b3ddb6c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.meta;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
@@ -29,6 +31,7 @@
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -173,7 +176,8 @@ public void initialize(
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger) throws IOException {
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
index 5454a702df..4b13ebe94c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -25,6 +25,8 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.lang.reflect.Field;
 import java.nio.channels.FileChannel;
 import java.util.Enumeration;
@@ -130,7 +132,7 @@ private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flush
         for (int i = 0; i < journals.size(); i++) {
             Journal mock = spy(journals.get(i));
             when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) ->  {
-                SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+                SlowBufferedChannel sbc = new SlowBufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
                 sbc.setAddDelay(addDelay);
                 sbc.setGetDelay(getDelay);
                 sbc.setFlushDelay(flushDelay);
@@ -306,7 +308,7 @@ private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exc
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
@@ -347,7 +349,7 @@ private void doWritesNoBackpressure(final int bkId) throws Exception {
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
@@ -392,7 +394,7 @@ private void doWritesWithBackpressure(final int bkId) throws Exception {
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 04119d5154..2c349a0118 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -24,6 +24,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.util.Collections;
 import java.util.EnumSet;
@@ -109,7 +110,8 @@ public void removeErrors(BookieSocketAddress... bookies) {
     }
 
     public void seedEntries(BookieSocketAddress bookie, long ledgerId, long entryId, long lac) throws Exception {
-        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C);
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C,
+                UnpooledByteBufAllocator.DEFAULT, false);
         ByteBuf entry = ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
                                                      entryId, lac, 0, Unpooled.buffer(10)));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 4ebe01cfcc..4cf8a7cd64 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -29,6 +29,9 @@
 import static org.mockito.Mockito.mock;
 
 import com.google.protobuf.ByteString;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -54,7 +57,7 @@ public void testConstructLongPollThreads() throws Exception {
         // long poll threads == read threads
         ServerConfiguration conf = new ServerConfiguration();
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
         }
 
@@ -62,7 +65,7 @@ public void testConstructLongPollThreads() throws Exception {
         conf = new ServerConfiguration();
         conf.setNumReadWorkerThreads(0);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
         }
@@ -72,7 +75,7 @@ public void testConstructLongPollThreads() throws Exception {
         conf.setNumReadWorkerThreads(2);
         conf.setNumLongPollWorkerThreads(2);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertNotNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
             assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index b1feef059a..97b7488b78 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,10 +21,14 @@
 
 package org.apache.bookkeeper.test;
 
+
+
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -41,6 +45,7 @@
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -237,6 +242,7 @@ protected void stopZKCluster() throws Exception {
     protected void startBKCluster(String metadataServiceUri) throws Exception {
         baseConf.setMetadataServiceUri(metadataServiceUri);
         baseClientConf.setMetadataServiceUri(metadataServiceUri);
+        baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         if (numBookies > 0) {
             bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
@@ -303,6 +309,7 @@ protected ServerConfiguration newServerConfiguration(int port, File journalDir,
         }
         conf.setLedgerDirNames(ledgerDirNames);
         conf.setEnableTaskExecutionStats(true);
+        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         return conf;
     }
 
@@ -677,7 +684,7 @@ protected BookieServer startBookie(ServerConfiguration conf, final Bookie b)
         TestStatsProvider provider = new TestStatsProvider();
         BookieServer server = new BookieServer(conf, provider.getStatsLogger("")) {
             @Override
-            protected Bookie newBookie(ServerConfiguration conf) {
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator) {
                 return b;
             }
         };
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index c6cc72bcd8..d38a178b77 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -25,6 +25,7 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -157,8 +158,8 @@ public void testWriteGaps() throws Exception {
         BookieSocketAddress addr = bs.getLocalAddress();
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         synchronized (arc) {
@@ -258,8 +259,8 @@ private ByteBufList createByteBuffer(int i, long lid, long eid) {
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
@@ -270,8 +271,8 @@ public void testNoLedger() throws Exception {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(),
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 23012dc4dd..41798a6df4 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -965,3 +965,53 @@ storage.serve.readonly.tables=false
 
 # the cluster controller schedule interval, in milliseconds. default is 30 seconds.
 storage.cluster.controller.schedule.interval.ms=30000
+
+
+#############################################################################
+## Netty Allocator Settings
+#############################################################################
+
+# Define the memory pooling policy.
+# Available options are:
+#   - PooledDirect: Use Direct memory for all buffers and pool the memory.
+#                   Direct memory will avoid the overhead of JVM GC and most
+#                   memory copies when reading and writing to socket channel.
+#                   Pooling will add memory space overhead due to the fact that
+#                   there will be fragmentation in the allocator and that threads
+#                   will keep a portion of memory as thread-local to avoid
+#                   contention when possible.
+#   - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+#                   This option has the least overhead in terms of memory usage
+#                   since the memory will be automatically reclaimed by the
+#                   JVM GC but might impose a performance penalty at high
+#                   throughput.
+# Default is: PooledDirect
+# allocatorPoolingPolicy=PooledDirect
+  
+# Controls the amount of concurrency for the memory pool.
+# Default is to have a number of allocator arenas equals to 2 * CPUS.
+# Decreasing this number will reduce the amount of memory overhead, at the
+# expense of increased allocation contention.
+# allocatorPoolingConcurrency=8
+
+# Define the memory allocator out of memory policy.
+# Available options are: 
+#   - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+#                     fallback to allocate an unpooled buffer from JVM heap.
+#                     This will help absorb memory allocation spikes because the heap
+#                     allocations will naturally slow down the process and will result
+#                     if full GC cleanup if the Heap itself is full.
+#   - ThrowException: Throw regular OOM exception without taking addition actions.
+# Default is: FallbackToHeap
+# allocatorOutOfMemoryPolicy=FallbackToHeap
+
+# Available options are:
+#   - Disabled: No leak detection and no overhead.
+#   - Simple: Instruments 1% of the allocated buffer to track for leaks.
+#   - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+#               stack traces of places where the buffer was used.
+#   - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+#               stack traces of places where the buffer was used. Introduce very
+#               significant overhead.
+# Default is: Disabled
+# allocatorLeakDetectionPolicy=Disabled
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 3995ea8875..2319b2ea3e 100644
--- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -24,6 +24,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -106,13 +107,13 @@
         public void doSetup() throws Exception {
             final byte[] password = "password".getBytes("UTF-8");
             crc32 = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.CRC32);
+                    password, DigestType.CRC32, PooledByteBufAllocator.DEFAULT, true);
 
             crc32c = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.CRC32C);
+                    password, DigestType.CRC32C, PooledByteBufAllocator.DEFAULT, true);
 
             mac = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.HMAC);
+                    password, DigestType.HMAC, PooledByteBufAllocator.DEFAULT, true);
 
             digestBuf = Unpooled.buffer(getDigestManager(digest).getMacCodeLength());
 
diff --git a/shaded/bookkeeper-server-shaded/pom.xml b/shaded/bookkeeper-server-shaded/pom.xml
index a39227bea1..85c76beba4 100644
--- a/shaded/bookkeeper-server-shaded/pom.xml
+++ b/shaded/bookkeeper-server-shaded/pom.xml
@@ -66,6 +66,7 @@
                   <include>com.google.guava:guava</include>
                   <include>com.google.protobuf:protobuf-java</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -83,7 +84,7 @@
             </configuration>
           </execution>
         </executions>
-      </plugin> 
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>license-maven-plugin</artifactId>
diff --git a/shaded/distributedlog-core-shaded/pom.xml b/shaded/distributedlog-core-shaded/pom.xml
index e5ef59f80b..85ad8f0b5c 100644
--- a/shaded/distributedlog-core-shaded/pom.xml
+++ b/shaded/distributedlog-core-shaded/pom.xml
@@ -87,6 +87,7 @@
                   <include>net.java.dev.jna:jna</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -207,7 +208,7 @@
             </configuration>
           </execution>
         </executions>
-      </plugin> 
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>license-maven-plugin</artifactId>
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index a42e12eb2b..d83a8f1024 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -137,7 +137,7 @@ groups:
     default: 8080
 
 - name: Security settings
-  params: 
+  params:
   - param: bookieAuthProviderFactoryClass
     description: The bookie authentication provider factory class name. If this is null, no authentication will take place.
     default: null
@@ -207,7 +207,7 @@ groups:
        5: expanding header to 512 and padding writes to align sector size configured by `journalAlignmentSize`
        6: persisting explicitLac is introduced
 
-      By default, it is `6`. 
+      By default, it is `6`.
       If you'd like to disable persisting ExplicitLac, you can set this config to < `6` and also fileInfoFormatVersionToWrite should be set to 0. If there is mismatch then the serverconfig is considered invalid.
       You can disable `padding-writes` by setting journal version back to `4`. This feature is available in 4.5.0 and onward versions.
     default: 6
@@ -618,7 +618,7 @@ groups:
   - param: ensemblePlacementPolicy
     description: |
       The ensemble placement policy used for finding bookie for re-replicating entries.
-    
+
       Options:
         - org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
         - org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
@@ -678,4 +678,55 @@ groups:
     description: The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
     default: 5000
 
-
+- name: Memory allocator settings
+  params:
+  - param: allocatorPoolingPolicy
+    description: |
+      Define the memory pooling policy.
+
+      Available options are:
+        - PooledDirect: Use Direct memory for all buffers and pool the memory.
+                        Direct memory will avoid the overhead of JVM GC and most
+                        memory copies when reading and writing to socket channel.
+                        Pooling will add memory space overhead due to the fact that
+                        there will be fragmentation in the allocator and that threads
+                        will keep a portion of memory as thread-local to avoid
+                        contention when possible.
+        - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+                        This option has the least overhead in terms of memory usage
+                        since the memory will be automatically reclaimed by the
+                        JVM GC but might impose a performance penalty at high
+                        throughput.
+    default: PooledDirect
+  - param: allocatorPoolingConcurrency
+    description: |
+      Controls the amount of concurrency for the memory pool.
+      Default is to have a number of allocator arenas equals to 2 * CPUS.
+      Decreasing this number will reduce the amount of memory overhead, at the
+      expense of increased allocation contention.
+    default: 2 * CPUS
+  - param: allocatorOutOfMemoryPolicy
+    description: |
+      Define the memory allocator out of memory policy.
+
+      Available options are:
+        - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+                          fallback to allocate an unpooled buffer from JVM heap.
+                          This will help absorb memory allocation spikes because the heap
+                          allocations will naturally slow down the process and will result
+                          if full GC cleanup if the Heap itself is full.
+        - ThrowException: Throw regular OOM exception without taking addition actions.
+    default: FallbackToHeap
+  - param: allocatorLeakDetectionPolicy
+    description: |
+      Define the memory allocator leak detection policy.
+
+      Available options are:
+        - Disabled: No leak detection and no overhead.
+        - Simple: Instruments 1% of the allocated buffer to track for leaks.
+        - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+                    stack traces of places where the buffer was used.
+        - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+                    stack traces of places where the buffer was used. Introduce very
+                    significant overhead.
+    default: Disabled


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services