You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/01/08 20:16:15 UTC

[bookkeeper] branch master updated: Configure Netty allocator in bookie and client

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a66de0  Configure Netty allocator in bookie and client
1a66de0 is described below

commit 1a66de0f1841309390261e90d8d0d0af7aa2ede6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 8 12:16:09 2019 -0800

    Configure Netty allocator in bookie and client
    
    ### Motivation
    
    This is based on #1754. Adding the code to configure and use the allocator wrapper in bookie and client.
    
    (I'll rebase once the first PR is merged)
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1755 from merlimat/use-allocator
---
 .../apache/bookkeeper/benchmark/BenchBookie.java   |  4 +-
 .../allocator/impl/ByteBufAllocatorImpl.java       | 68 +++++++++++----
 .../impl/ByteBufAllocatorBuilderTest.java          | 12 +--
 bookkeeper-server/pom.xml                          |  5 ++
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 28 ++++--
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 10 ++-
 .../apache/bookkeeper/bookie/BufferedChannel.java  | 15 ++--
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 25 +++---
 .../bookkeeper/bookie/EntryLoggerAllocator.java    |  8 +-
 .../bookie/InterleavedLedgerStorage.java           | 14 ++-
 .../java/org/apache/bookkeeper/bookie/Journal.java | 15 ++--
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  5 +-
 .../apache/bookkeeper/bookie/ReadOnlyBookie.java   |  6 +-
 .../bookkeeper/bookie/SlowBufferedChannel.java     | 10 ++-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  8 +-
 .../bookie/storage/ldb/DbLedgerStorage.java        | 10 ++-
 .../bookkeeper/bookie/storage/ldb/ReadCache.java   | 10 ++-
 .../ldb/SingleDirectoryDbLedgerStorage.java        | 15 ++--
 .../bookkeeper/bookie/storage/ldb/WriteCache.java  | 11 ++-
 .../org/apache/bookkeeper/client/BookKeeper.java   | 49 +++++++++--
 .../apache/bookkeeper/client/ClientContext.java    |  3 +
 .../org/apache/bookkeeper/client/LedgerHandle.java |  2 +-
 .../bookkeeper/client/api/BookKeeperBuilder.java   | 11 +++
 .../client/impl/BookKeeperBuilderImpl.java         |  7 ++
 .../bookkeeper/conf/AbstractConfiguration.java     | 99 ++++++++++++++++++++++
 .../bookkeeper/conf/ClientConfiguration.java       |  8 ++
 .../apache/bookkeeper/proto/BookieClientImpl.java  | 20 +++--
 .../apache/bookkeeper/proto/BookieNettyServer.java |  9 +-
 .../bookkeeper/proto/BookieRequestProcessor.java   | 11 ++-
 .../org/apache/bookkeeper/proto/BookieServer.java  | 36 ++++++--
 .../bookkeeper/proto/PerChannelBookieClient.java   | 20 ++---
 .../proto/checksum/CRC32CDigestManager.java        |  7 +-
 .../proto/checksum/CRC32DigestManager.java         |  5 +-
 .../bookkeeper/proto/checksum/DigestManager.java   | 31 +++----
 .../proto/checksum/DummyDigestManager.java         |  5 +-
 .../proto/checksum/MacDigestManager.java           |  5 +-
 .../bookkeeper/tls/SecurityHandlerFactory.java     |  3 +-
 .../apache/bookkeeper/tls/TLSContextFactory.java   | 10 ++-
 .../org/apache/bookkeeper/util/ByteBufList.java    |  3 +-
 .../bookie/BookieInitializationTest.java           |  9 +-
 .../bookkeeper/bookie/BufferedChannelTest.java     |  6 +-
 .../apache/bookkeeper/bookie/CompactionTest.java   | 26 ++++--
 .../apache/bookkeeper/bookie/CreateNewLogTest.java |  8 +-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java |  6 +-
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java |  4 +-
 .../bookie/InterleavedLedgerStorageTest.java       |  4 +-
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  7 +-
 .../bookkeeper/bookie/LedgerStorageTest.java       |  8 +-
 .../bookie/SlowInterleavedLedgerStorage.java       | 10 ++-
 .../bookie/SortedLedgerStorageCheckpointTest.java  |  5 +-
 .../apache/bookkeeper/bookie/StateManagerTest.java |  6 +-
 .../apache/bookkeeper/bookie/TestSyncThread.java   |  5 +-
 .../bookie/storage/ldb/ConversionRollbackTest.java |  5 +-
 .../bookie/storage/ldb/ConversionTest.java         |  8 +-
 .../storage/ldb/DbLedgerStorageWriteCacheTest.java | 11 ++-
 .../storage/ldb/LocationsIndexRebuildTest.java     |  5 +-
 .../bookie/storage/ldb/ReadCacheTest.java          |  7 +-
 .../bookie/storage/ldb/WriteCacheTest.java         | 23 ++---
 .../apache/bookkeeper/client/BookKeeperTest.java   |  1 +
 .../bookkeeper/client/BookKeeperTestClient.java    |  4 +-
 .../org/apache/bookkeeper/client/ClientUtil.java   |  4 +-
 .../bookkeeper/client/MockBookKeeperTestCase.java  | 10 ++-
 .../bookkeeper/client/MockClientContext.java       | 15 ++++
 .../client/ReadLastConfirmedAndEntryOpTest.java    |  4 +-
 .../client/TestGetBookieInfoTimeout.java           |  4 +-
 .../bookkeeper/conf/TestBKConfiguration.java       |  2 +
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |  6 +-
 .../bookkeeper/meta/LedgerManagerTestCase.java     |  6 +-
 .../bookkeeper/proto/BookieBackpressureTest.java   | 10 ++-
 .../apache/bookkeeper/proto/MockBookieClient.java  |  4 +-
 .../proto/TestBookieRequestProcessor.java          |  9 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  9 +-
 .../apache/bookkeeper/test/BookieClientTest.java   | 13 +--
 conf/bk_server.conf                                | 50 +++++++++++
 .../proto/checksum/DigestTypeBenchmark.java        |  7 +-
 shaded/bookkeeper-server-shaded/pom.xml            |  3 +-
 shaded/distributedlog-core-shaded/pom.xml          |  3 +-
 site/_data/config/bk_server.yaml                   | 59 ++++++++++++-
 78 files changed, 763 insertions(+), 246 deletions(-)

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 216aff9..94776f0 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -20,6 +20,7 @@
 package org.apache.bookkeeper.benchmark;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -175,7 +176,8 @@ public class BenchBookie {
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
 
         ClientConfiguration conf = new ClientConfiguration();
-        BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(conf, eventLoop, PooledByteBufAllocator.DEFAULT, executor, scheduler,
+                NullStatsLogger.INSTANCE);
         LatencyCallback lc = new LatencyCallback();
 
         ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 3544165..1889eb9 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -39,6 +39,10 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
 
     private static final Logger log = LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
 
+    // Same as AbstractByteBufAllocator, but copied here since it's not visible
+    private static final int DEFAULT_INITIAL_CAPACITY = 256;
+    private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
+
     private final ByteBufAllocator pooledAllocator;
     private final ByteBufAllocator unpooledAllocator;
     private final PoolingPolicy poolingPolicy;
@@ -63,16 +67,22 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
 
         if (poolingPolicy == PoolingPolicy.PooledDirect) {
             if (pooledAllocator == null) {
-                this.pooledAllocator = new PooledByteBufAllocator(
-                        true /* preferDirect */,
-                        poolingConcurrency /* nHeapArena */,
-                        poolingConcurrency /* nDirectArena */,
-                        PooledByteBufAllocator.defaultPageSize(),
-                        PooledByteBufAllocator.defaultMaxOrder(),
-                        PooledByteBufAllocator.defaultTinyCacheSize(),
-                        PooledByteBufAllocator.defaultSmallCacheSize(),
-                        PooledByteBufAllocator.defaultNormalCacheSize(),
-                        PooledByteBufAllocator.defaultUseCacheForAllThreads());
+                if (poolingConcurrency == PooledByteBufAllocator.defaultNumDirectArena()) {
+                    // If all the parameters are the same as in the default Netty pool,
+                    // just reuse the static instance as the underlying allocator.
+                    this.pooledAllocator = PooledByteBufAllocator.DEFAULT;
+                } else {
+                    this.pooledAllocator = new PooledByteBufAllocator(
+                            true /* preferDirect */,
+                            poolingConcurrency /* nHeapArena */,
+                            poolingConcurrency /* nDirectArena */,
+                            PooledByteBufAllocator.defaultPageSize(),
+                            PooledByteBufAllocator.defaultMaxOrder(),
+                            PooledByteBufAllocator.defaultTinyCacheSize(),
+                            PooledByteBufAllocator.defaultSmallCacheSize(),
+                            PooledByteBufAllocator.defaultNormalCacheSize(),
+                            PooledByteBufAllocator.defaultUseCacheForAllThreads());
+                }
             } else {
                 this.pooledAllocator = pooledAllocator;
             }
@@ -110,6 +120,25 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
     }
 
     @Override
+    public ByteBuf buffer() {
+        return buffer(DEFAULT_INITIAL_CAPACITY);
+    }
+
+    @Override
+    public ByteBuf buffer(int initialCapacity) {
+        return buffer(initialCapacity, DEFAULT_MAX_CAPACITY);
+    }
+
+    @Override
+    public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            return newDirectBuffer(initialCapacity, maxCapacity, true /* can fallback to heap if needed */);
+        } else {
+            return newHeapBuffer(initialCapacity, maxCapacity);
+        }
+    }
+
+    @Override
     protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
         try {
             // There are few cases in which we ask explicitly for a pooled
@@ -125,30 +154,33 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
 
     @Override
     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+        // If caller asked specifically for a direct buffer, we cannot fallback to heap
+        return newDirectBuffer(initialCapacity, maxCapacity, false);
+    }
+
+    private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean canFallbackToHeap) {
         if (poolingPolicy == PoolingPolicy.PooledDirect) {
             try {
                 return pooledAllocator.directBuffer(initialCapacity, maxCapacity);
             } catch (OutOfMemoryError e) {
-                switch (outOfMemoryPolicy) {
-                case ThrowException:
-                    outOfMemoryListener.accept(e);
-                    throw e;
-
-                case FallbackToHeap:
+                if (canFallbackToHeap && outOfMemoryPolicy == OutOfMemoryPolicy.FallbackToHeap) {
                     try {
                         return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
                     } catch (OutOfMemoryError e2) {
                         outOfMemoryListener.accept(e2);
                         throw e2;
                     }
+                } else {
+                    // ThrowException
+                    outOfMemoryListener.accept(e);
+                    throw e;
                 }
-                return null;
             }
         } else {
             // Unpooled heap buffer. Force heap buffers because unpooled direct
             // buffers have very high overhead of allocation/reclaiming
             try {
-                return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+                return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
             } catch (OutOfMemoryError e) {
                 outOfMemoryListener.accept(e);
                 throw e;
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 8ff66c3..662dd83 100644
--- a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -145,10 +145,10 @@ public class ByteBufAllocatorBuilderTest {
     }
 
     @Test
-    public void testOomUnpooled() {
+    public void testOomUnpooledDirect() {
         ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
-        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
-        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+        OutOfMemoryError noMemError = new OutOfMemoryError("no more direct mem");
+        when(heapAlloc.directBuffer(anyInt(), anyInt())).thenThrow(noMemError);
 
         AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
 
@@ -166,11 +166,11 @@ public class ByteBufAllocatorBuilderTest {
             fail("Should have thrown exception");
         } catch (OutOfMemoryError e) {
             // Expected
-            assertEquals(noHeapError, e);
+            assertEquals(noMemError, e);
         }
 
         // Ensure the notification was triggered even when exception is thrown
-        assertEquals(noHeapError, receivedException.get());
+        assertEquals(noMemError, receivedException.get());
     }
 
     @Test
@@ -214,7 +214,7 @@ public class ByteBufAllocatorBuilderTest {
 
         ByteBuf buf2 = alloc.directBuffer();
         assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
-        assertTrue(buf2.hasArray());
+        assertFalse(buf2.hasArray());
     }
 
     @Test
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 10b5ea4..c1b684e 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -32,6 +32,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common-allocator</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-proto</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index fd352d8..a0acd31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -28,9 +28,13 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
@@ -53,6 +57,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
@@ -136,6 +141,8 @@ public class Bookie extends BookieCriticalThread {
     final StatsLogger statsLogger;
     private final BookieStats bookieStats;
 
+    private final ByteBufAllocator allocator;
+
     /**
      * Exception is thrown when no such a ledger is found in this bookie.
      */
@@ -600,7 +607,7 @@ public class Bookie extends BookieCriticalThread {
 
     public Bookie(ServerConfiguration conf)
             throws IOException, InterruptedException, BookieException {
-        this(conf, NullStatsLogger.INSTANCE);
+        this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
     }
 
     private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) throws IOException {
@@ -658,12 +665,13 @@ public class Bookie extends BookieCriticalThread {
                 null,
                 checkpointSource,
                 checkpointer,
-                statsLogger);
+                statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
 
         return ledgerStorage;
     }
 
-    public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
+    public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
             throws IOException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
         this.statsLogger = statsLogger;
@@ -676,6 +684,7 @@ public class Bookie extends BookieCriticalThread {
         this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
         this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
                                                        this.ledgerDirsManager);
+        this.allocator = allocator;
 
         // instantiate zookeeper client to initialize ledger manager
         this.metadataDriver = instantiateMetadataDriver(conf);
@@ -732,7 +741,7 @@ public class Bookie extends BookieCriticalThread {
         journals = Lists.newArrayList();
         for (int i = 0; i < journalDirectories.size(); i++) {
             journals.add(new Journal(i, journalDirectories.get(i),
-                         conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)));
+                    conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
         }
 
         this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -786,7 +795,8 @@ public class Bookie extends BookieCriticalThread {
             stateManager,
             checkpointSource,
             syncThread,
-            statsLogger);
+            statsLogger,
+            allocator);
 
 
         handles = new HandleFactoryImpl(ledgerStorage);
@@ -1287,8 +1297,8 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    static ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
-        ByteBuf bb = PooledByteBufAllocator.DEFAULT.directBuffer(8 + 8 + 4 + explicitLac.capacity());
+    private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
+        ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + explicitLac.capacity());
         bb.writeLong(ledgerId);
         bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
         bb.writeInt(explicitLac.capacity());
@@ -1485,6 +1495,10 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    public ByteBufAllocator getAllocator() {
+        return allocator;
+    }
+
     /**
      * Format the bookie server data.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f53195a..dc9137e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -29,7 +29,9 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -917,8 +919,8 @@ public class BookieShell implements Tool {
                     ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                         new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
 
-                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
-                        scheduler, NullStatsLogger.INSTANCE);
+                    BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup,
+                            UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
 
                     LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
                         CompletableFuture<Void> future = new CompletableFuture<>();
@@ -2825,9 +2827,9 @@ public class BookieShell implements Tool {
             };
 
             dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
-                        checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                        checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
             interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
-                    null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                    null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
             LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
 
             int convertedLedgers = 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 633c540..31fb203 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -66,23 +66,24 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
     private boolean closed = false;
 
     // make constructor to be public for unit test
-    public BufferedChannel(FileChannel fc, int capacity) throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, 0L);
+        this(allocator, fc, capacity, 0L);
     }
 
-    public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity, long unpersistedBytesBound)
+            throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, capacity, unpersistedBytesBound);
+        this(allocator, fc, capacity, capacity, unpersistedBytesBound);
     }
 
-    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
-            throws IOException {
+    public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+            long unpersistedBytesBound) throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
         this.position = new AtomicLong(fc.position());
         this.writeBufferStartPosition.set(position.get());
-        this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+        this.writeBuffer = allocator.directBuffer(writeCapacity);
         this.unpersistedBytes = new AtomicLong(0);
         this.unpersistedBytesBound = unpersistedBytesBound;
         this.doRegularFlushes = unpersistedBytesBound > 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index af82620..1389370 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -88,9 +88,9 @@ public class EntryLogger {
         private final File logFile;
         private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
-        public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
-                long unpersistedBytesBound) throws IOException {
-            super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
+        public BufferedLogChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+                long logId, File logFile, long unpersistedBytesBound) throws IOException {
+            super(allocator, fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
             this.logFile = logFile;
@@ -283,6 +283,8 @@ public class EntryLogger {
 
     private final int maxSaneEntrySize;
 
+    private final ByteBufAllocator allocator;
+
     final ServerConfiguration conf;
     /**
      * Scan entries in a entry log file.
@@ -332,15 +334,16 @@ public class EntryLogger {
      */
     public EntryLogger(ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) throws IOException {
-        this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+        this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
     }
 
     public EntryLogger(ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger)
-                    throws IOException {
+            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         //We reserve 500 bytes as overhead for the protocol.  This is not 100% accurate
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
+        this.allocator = allocator;
         this.ledgerDirsManager = ledgerDirsManager;
         this.conf = conf;
         entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -371,7 +374,7 @@ public class EntryLogger {
         }
         this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
         this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
-                logId);
+                logId, allocator);
         if (entryLogPerLedgerEnabled) {
             this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
                     entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger);
@@ -840,7 +843,7 @@ public class EntryLogger {
             throw new IOException(e.toString());
         }
 
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entry.entrySize, entry.entrySize);
+        ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
         int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
         if (rc != entry.entrySize) {
             // Note that throwing NoEntryException here instead of IOException is not
@@ -872,7 +875,7 @@ public class EntryLogger {
         BufferedReadChannel bc = getChannelForLogId(entryLogId);
 
         // Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+        ByteBuf headers = allocator.directBuffer(LOGFILE_HEADER_SIZE);
         try {
             bc.read(headers, 0);
 
@@ -988,7 +991,7 @@ public class EntryLogger {
         long pos = LOGFILE_HEADER_SIZE;
 
         // Start with a reasonably sized buffer size
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024);
+        ByteBuf data = allocator.directBuffer(1024 * 1024);
 
         try {
 
@@ -1070,7 +1073,7 @@ public class EntryLogger {
         EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
 
         final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
+        ByteBuf ledgersMap = allocator.directBuffer(maxMapSize);
 
         try {
             while (offset < bc.size()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 3ddd8e2..1c32b55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -25,6 +25,7 @@ import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.BufferedWriter;
@@ -61,11 +62,14 @@ class EntryLoggerAllocator {
     private final Object createCompactionLogLock = new Object();
     private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
     private final boolean entryLogPreAllocationEnabled;
+    private final ByteBufAllocator byteBufAllocator;
     final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
 
     EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId) {
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
+            ByteBufAllocator byteBufAllocator) {
         this.conf = conf;
+        this.byteBufAllocator = byteBufAllocator;
         this.ledgerDirsManager = ledgerDirsManager;
         this.preallocatedLogId = logId;
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
@@ -161,7 +165,7 @@ class EntryLoggerAllocator {
         File newLogFile = new File(dirForNextEntryLog, logFileName);
         FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
 
-        BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+        BufferedLogChannel logChannel = new BufferedLogChannel(byteBufAllocator, channel, conf.getWriteBufferBytes(),
                 conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
         logfileHeader.readerIndex(0);
         logChannel.write(logfileHeader);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index d1287c3..3b5bf01 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -129,7 +131,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
         initializeWithEntryLogListener(
             conf,
@@ -140,7 +143,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
             checkpointSource,
             checkpointer,
             this,
-            statsLogger);
+            statsLogger,
+            allocator);
     }
 
     void initializeWithEntryLogListener(ServerConfiguration conf,
@@ -151,7 +155,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                                         CheckpointSource checkpointSource,
                                         Checkpointer checkpointer,
                                         EntryLogListener entryLogListener,
-                                        StatsLogger statsLogger) throws IOException {
+                                        StatsLogger statsLogger,
+                                        ByteBufAllocator allocator) throws IOException {
         initializeWithEntryLogger(
                 conf,
                 ledgerManager,
@@ -160,7 +165,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                 stateManager,
                 checkpointSource,
                 checkpointer,
-                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
+                        allocator),
                 statsLogger);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd47da8..0a78fca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -26,7 +26,9 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -85,8 +87,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
      */
     @FunctionalInterface
     public interface BufferedChannelBuilder {
-        BufferedChannelBuilder DEFAULT_BCBUILDER =
-                (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+        BufferedChannelBuilder DEFAULT_BCBUILDER = (FileChannel fc,
+                int capacity) -> new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
 
         BufferedChannel create(FileChannel fc, int capacity) throws IOException;
     }
@@ -627,18 +629,21 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
 
     volatile boolean running = true;
     private final LedgerDirsManager ledgerDirsManager;
+    private final ByteBufAllocator allocator;
 
     // Expose Stats
     private final JournalStats journalStats;
 
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) {
-        this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+        this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
     }
 
     public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
+            LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
         super("BookieJournal-" + conf.getBookiePort());
+        this.allocator = allocator;
 
         if (conf.isBusyWaitEnabled()) {
             // To achieve lower latency, use busy-wait blocking queue implementation
@@ -1161,7 +1166,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
     }
 
     public BufferedChannelBuilder getBufferedChannelBuilder() {
-        return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+        return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 111b8c2..1353e8b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,7 +55,8 @@ public interface LedgerStorage {
                     StateManager stateManager,
                     CheckpointSource checkpointSource,
                     Checkpointer checkpointer,
-                    StatsLogger statsLogger)
+                    StatsLogger statsLogger,
+                    ByteBufAllocator allocator)
             throws IOException;
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 5125c07..cb6e9f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -38,9 +40,9 @@ public class ReadOnlyBookie extends Bookie {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyBookie.class);
 
-    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
+    public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
             throws IOException, KeeperException, InterruptedException, BookieException {
-        super(conf, statsLogger);
+        super(conf, statsLogger, allocator);
         if (conf.isReadOnlyModeEnabled()) {
             stateManager.forceToReadOnly();
         } else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
index 9fdc34c..e9731a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie;
  */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -36,12 +37,13 @@ public class SlowBufferedChannel extends BufferedChannel {
     public volatile long addDelay = 0;
     public volatile long flushDelay = 0;
 
-    public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
-        super(fc, capacity);
+    public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
+        super(allocator, fc, capacity);
     }
 
-    public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
-        super(fc, writeCapacity, readCapacity);
+    public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity)
+            throws IOException {
+        super(allocator, fc, writeCapacity, readCapacity);
     }
 
     public void setAddDelay(long delay) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 5e4dbad..77653db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,6 +24,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
@@ -72,7 +74,8 @@ public class SortedLedgerStorage
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
 
         interleavedLedgerStorage.initializeWithEntryLogListener(
@@ -86,7 +89,8 @@ public class SortedLedgerStorage
             // uses sorted ledger storage's own entry log listener
             // since it manages entry log rotations and checkpoints.
             this,
-            statsLogger);
+            statsLogger,
+            allocator);
 
         if (conf.isEntryLogPerLedgerEnabled()) {
             this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ccabaed..287f452 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.DefaultThreadFactory;
 //CHECKSTYLE.OFF: IllegalImport
 import io.netty.util.internal.PlatformDependent;
@@ -63,7 +64,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 
 
-
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
  */
@@ -88,13 +88,16 @@ public class DbLedgerStorage implements LedgerStorage {
     private ScheduledExecutorService gcExecutor;
     private DbLedgerStorageStats stats;
 
+    protected ByteBufAllocator allocator;
+
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+            Checkpointer checkpointer, StatsLogger statsLogger, ByteBufAllocator allocator) throws IOException {
         long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
         long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
+        this.allocator = allocator;
         this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
@@ -139,7 +142,8 @@ public class DbLedgerStorage implements LedgerStorage {
             StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
             throws IOException {
         return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+                stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
+                readCacheSize);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
index b14478f..986c741 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -57,13 +57,15 @@ public class ReadCache implements Closeable {
 
     private final int segmentSize;
 
+    private ByteBufAllocator allocator;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public ReadCache(long maxCacheSize) {
-        this(maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
+    public ReadCache(ByteBufAllocator allocator, long maxCacheSize) {
+        this(allocator, maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
     }
 
-    public ReadCache(long maxCacheSize, int maxSegmentSize) {
+    public ReadCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
+        this.allocator = allocator;
         int segmentsCount = Math.max(2, (int) (maxCacheSize / maxSegmentSize));
         segmentSize = (int) (maxCacheSize / segmentsCount);
 
@@ -140,7 +142,7 @@ public class ReadCache implements Closeable {
                     int entryOffset = (int) res.first;
                     int entryLen = (int) res.second;
 
-                    ByteBuf entry = ByteBufAllocator.DEFAULT.directBuffer(entryLen, entryLen);
+                    ByteBuf entry = allocator.directBuffer(entryLen, entryLen);
                     entry.writeBytes(cacheSegments.get(segmentIdx), entryOffset, entryLen);
                     return entry;
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 197014a..3289b3d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
@@ -132,7 +133,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
             CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
-            ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+            ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+            throws IOException {
 
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
@@ -141,8 +143,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
         log.info("Creating single directory db ledger storage on {}", baseDir);
 
         this.writeCacheMaxSize = writeCacheSize;
-        this.writeCache = new WriteCache(writeCacheMaxSize / 2);
-        this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
+        this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
 
         this.checkpointSource = checkpointSource;
 
@@ -153,7 +155,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                 DEFAULT_MAX_THROTTLE_TIME_MILLIS);
         maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
 
-        readCache = new ReadCache(readCacheMaxSize);
+        readCache = new ReadCache(allocator, readCacheMaxSize);
 
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
@@ -164,7 +166,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
         dbLedgerStorageStats = new DbLedgerStorageStats(
@@ -179,7 +181,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+            Checkpointer checkpointer, StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         /// Initialized in constructor
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index 08ffe67..ac58e8e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -80,17 +80,20 @@ public class WriteCache implements Closeable {
 
     private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
 
-    public WriteCache(long maxCacheSize) {
+    private final ByteBufAllocator allocator;
+
+    public WriteCache(ByteBufAllocator allocator, long maxCacheSize) {
         // Default maxSegmentSize set to 1Gb
-        this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+        this(allocator, maxCacheSize, 1 * 1024 * 1024 * 1024);
     }
 
-    public WriteCache(long maxCacheSize, int maxSegmentSize) {
+    public WriteCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
         checkArgument(maxSegmentSize > 0);
 
         long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
         checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment size needs to be in form of 2^n");
 
+        this.allocator = allocator;
         this.maxCacheSize = maxCacheSize;
         this.maxSegmentSize = (int) maxSegmentSize;
         this.segmentOffsetMask = maxSegmentSize - 1;
@@ -185,7 +188,7 @@ public class WriteCache implements Closeable {
 
         long offset = result.first;
         int size = (int) result.second;
-        ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+        ByteBuf entry = allocator.buffer(size, size);
 
         int localOffset = (int) (offset & segmentOffsetMask);
         int segmentIdx = (int) (offset >>> segmentOffsetBits);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 0acf16f..94591af 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -26,6 +26,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -56,6 +58,7 @@ import org.apache.bookkeeper.client.api.CreateBuilder;
 import org.apache.bookkeeper.client.api.DeleteBuilder;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -99,10 +102,11 @@ import org.slf4j.LoggerFactory;
  */
 public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
-    static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
 
 
     final EventLoopGroup eventLoopGroup;
+    private final ByteBufAllocator allocator;
 
     // The stats logger for this client.
     private final StatsLogger statsLogger;
@@ -149,6 +153,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
         ZooKeeper zk = null;
         EventLoopGroup eventLoopGroup = null;
+        ByteBufAllocator allocator = null;
         StatsLogger statsLogger = NullStatsLogger.INSTANCE;
         DNSToSwitchMapping dnsResolver = null;
         HashedWheelTimer requestTimer = null;
@@ -214,6 +219,18 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         }
 
         /**
+         * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+         *
+         * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+         * @return client builder.
+         * @since 4.9
+         */
+        public Builder allocator(ByteBufAllocator allocator) {
+            this.allocator = allocator;
+            return this;
+        }
+
+        /**
          * Configure the bookkeeper client with a provided {@link ZooKeeper} client.
          *
          * @param zk an external {@link ZooKeeper} client to use by the bookkeeper client.
@@ -276,7 +293,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
 
         public BookKeeper build() throws IOException, InterruptedException, BKException {
             checkNotNull(statsLogger, "No stats logger provided");
-            return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider);
+            return new BookKeeper(conf, zk, eventLoopGroup, allocator, statsLogger, dnsResolver, requestTimer,
+                    featureProvider);
         }
     }
 
@@ -313,7 +331,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
      */
     public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, BKException {
-        this(conf, null, null, NullStatsLogger.INSTANCE,
+        this(conf, null, null, null, NullStatsLogger.INSTANCE,
                 null, null, null);
     }
 
@@ -347,7 +365,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
             throws IOException, InterruptedException, BKException {
-        this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null);
+        this(conf, validateZooKeeper(zk), null, null, NullStatsLogger.INSTANCE, null, null, null);
     }
 
     /**
@@ -369,17 +387,19 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
             throws IOException, InterruptedException, BKException {
-        this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
+        this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), null, NullStatsLogger.INSTANCE,
                 null, null, null);
     }
 
     /**
      * Constructor for use with the builder. Other constructors also use it.
      */
+    @SuppressWarnings("deprecation")
     @VisibleForTesting
     BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        EventLoopGroup eventLoopGroup,
+                       ByteBufAllocator byteBufAllocator,
                        StatsLogger rootStatsLogger,
                        DNSToSwitchMapping dnsResolver,
                        HashedWheelTimer requestTimer,
@@ -443,8 +463,19 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
             this.ownEventLoopGroup = false;
         }
 
+        if (byteBufAllocator != null) {
+            this.allocator = byteBufAllocator;
+        } else {
+            this.allocator = ByteBufAllocatorBuilder.create()
+                    .poolingPolicy(conf.getAllocatorPoolingPolicy())
+                    .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+                    .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+                    .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                    .build();
+        }
+
         // initialize bookie client
-        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool,
                 scheduler, rootStatsLogger);
 
         if (null == requestTimer) {
@@ -517,6 +548,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         bookieWatcher = null;
         bookieInfoScheduler = null;
         bookieClient = null;
+        allocator = UnpooledByteBufAllocator.DEFAULT;
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
@@ -1466,6 +1498,11 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
             public boolean isClientClosed() {
                 return BookKeeper.this.isClosed();
             }
+
+            @Override
+            public ByteBufAllocator getByteBufAllocator() {
+                return allocator;
+            }
         };
 
     ClientContext getClientCtx() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
index d8803d0..da3abde 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.ByteBufAllocator;
+
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -37,6 +39,7 @@ interface ClientContext {
     BookieWatcher getBookieWatcher();
     EnsemblePlacementPolicy getPlacementPolicy();
     BookieClient getBookieClient();
+    ByteBufAllocator getByteBufAllocator();
     OrderedExecutor getMainWorkerPool();
     OrderedScheduler getScheduler();
     BookKeeperClientStats getClientStats();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 157a1b8..bd8ec68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -188,7 +188,7 @@ public class LedgerHandle implements WriteHandle {
         }
 
         macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
-                                               clientCtx.getConf().useV2WireProtocol);
+                                               clientCtx.getByteBufAllocator(), clientCtx.getConf().useV2WireProtocol);
 
         // If the password is empty, pass the same random ledger key which is generated by the hash of the empty
         // password, so that the bookie can avoid processing the keys for each entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
index 0e147dd..ea30dc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
@@ -20,9 +20,11 @@
  */
 package org.apache.bookkeeper.client.api;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
+
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -48,6 +50,15 @@ public interface BookKeeperBuilder {
     BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
 
     /**
+     * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+     *
+     * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+     * @return client builder.
+     * @since 4.9
+     */
+    BookKeeperBuilder allocator(ByteBufAllocator allocator);
+
+    /**
      * Configure the bookkeeper client with a provided {@link StatsLogger}.
      *
      * @param statsLogger an {@link StatsLogger} to use by the bookkeeper client to collect stats generated by the
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
index 3a07d1b..6373ace 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.client.impl;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
@@ -51,6 +52,12 @@ public class BookKeeperBuilderImpl implements BookKeeperBuilder {
     }
 
     @Override
+    public BookKeeperBuilder allocator(ByteBufAllocator allocator) {
+        builder.allocator(allocator);
+        return this;
+    }
+
+    @Override
     public BookKeeperBuilder statsLogger(StatsLogger statsLogger) {
         builder.statsLogger(statsLogger);
         return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 65d702b..36cfa63 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -28,6 +28,10 @@ import java.util.Map;
 import javax.net.ssl.SSLEngine;
 
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -155,6 +159,12 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     // enforce minimum number of racks per write quorum
     public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
 
+    // Allocator configuration
+    protected static final String ALLOCATOR_POOLING_POLICY = "allocatorPoolingPolicy";
+    protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency";
+    protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy";
+    protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy";
+
     // option to limit stats logging
     public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
 
@@ -882,6 +892,95 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     }
 
     /**
+     * @return the configured pooling policy for the allocator.
+     */
+    public PoolingPolicy getAllocatorPoolingPolicy() {
+        return PoolingPolicy.valueOf(this.getString(ALLOCATOR_POOLING_POLICY, PoolingPolicy.PooledDirect.toString()));
+    }
+
+    /**
+     * Define the memory pooling policy.
+     *
+     * <p>Default is {@link PoolingPolicy#PooledDirect}
+     *
+     * @param poolingPolicy
+     *            the memory pooling policy
+     * @return configuration object.
+     */
+    public T setAllocatorPoolingPolicy(PoolingPolicy poolingPolicy) {
+        this.setProperty(ALLOCATOR_POOLING_POLICY, poolingPolicy.toString());
+        return getThis();
+    }
+
+    /**
+     * @return the configured pooling concurrency for the allocator.
+     */
+    public int getAllocatorPoolingConcurrency() {
+        return this.getInteger(ALLOCATOR_POOLING_CONCURRENCY, 2 * Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
+     * Controls the amount of concurrency for the memory pool.
+     *
+     * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+     *
+     * <p>Decreasing this number will reduce the amount of memory overhead, at the
+     * expense of increased allocation contention.
+     *
+     * @param concurrency
+     *            the concurrency level to use for the allocator pool
+     * @return configuration object.
+     */
+    public T setAllocatorPoolingConcurrenncy(int concurrency) {
+        this.setProperty(ALLOCATOR_POOLING_POLICY, concurrency);
+        return getThis();
+    }
+
+    /**
+     * @return the configured ouf of memory policy for the allocator.
+     */
+    public OutOfMemoryPolicy getAllocatorOutOfMemoryPolicy() {
+        return OutOfMemoryPolicy
+                .valueOf(this.getString(ALLOCATOR_OOM_POLICY, OutOfMemoryPolicy.FallbackToHeap.toString()));
+    }
+
+    /**
+     * Define the memory allocator out of memory policy.
+     *
+     * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+     *
+     * @param oomPolicy
+     *            the "out-of-memory" policy for the memory allocator
+     * @return configuration object.
+     */
+    public T setAllocatorOutOfMemoryPolicy(OutOfMemoryPolicy oomPolicy) {
+        this.setProperty(ALLOCATOR_OOM_POLICY, oomPolicy.toString());
+        return getThis();
+    }
+
+    /**
+     * Return the configured leak detection policy for the allocator.
+     */
+    public LeakDetectionPolicy getAllocatorLeakDetectionPolicy() {
+        return LeakDetectionPolicy
+                .valueOf(this.getString(ALLOCATOR_LEAK_DETECTION_POLICY, LeakDetectionPolicy.Disabled.toString()));
+    }
+
+    /**
+     * Enable the leak detection for the allocator.
+     *
+     * <p>Default is {@link LeakDetectionPolicy#Disabled}
+     *
+     * @param leakDetectionPolicy
+     *            the leak detection policy for the memory allocator
+     * @return configuration object.
+     */
+    public T setAllocatorLeakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy) {
+        this.setProperty(ALLOCATOR_LEAK_DETECTION_POLICY, leakDetectionPolicy.toString());
+        return getThis();
+    }
+
+    /**
      * Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
      *
      * <p>Default is false
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7d0d319..3b390d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -21,11 +21,14 @@ import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
 
 import io.netty.buffer.ByteBuf;
+
 import java.util.concurrent.TimeUnit;
+
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.BookKeeperBuilder;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -1772,8 +1775,11 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     /**
      * Option to use Netty Pooled ByteBufs.
      *
+     * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+     *
      * @return the value of the option
      */
+    @Deprecated
     public boolean isNettyUsePooledBuffers() {
         return getBoolean(NETTY_USE_POOLED_BUFFERS, true);
     }
@@ -1785,6 +1791,8 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
      * @param enabled
      *          if enabled BookKeeper will use default Pooled Netty Buffer allocator
      *
+     * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+     *
      * @see #setUseV2WireProtocol(boolean)
      * @see ByteBuf#release()
      * @see LedgerHandle#readEntries(long, long)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 18f48a2..8342821 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -76,11 +77,12 @@ import org.slf4j.LoggerFactory;
 public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
     static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
 
-    OrderedExecutor executor;
-    ScheduledExecutorService scheduler;
-    ScheduledFuture<?> timeoutFuture;
+    private final OrderedExecutor executor;
+    private final ScheduledExecutorService scheduler;
+    private final ScheduledFuture<?> timeoutFuture;
 
-    EventLoopGroup eventLoopGroup;
+    private final EventLoopGroup eventLoopGroup;
+    private final ByteBufAllocator allocator;
     final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
             new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
 
@@ -96,10 +98,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
     private final long bookieErrorThresholdPerInterval;
 
     public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+                            ByteBufAllocator allocator,
                             OrderedExecutor executor, ScheduledExecutorService scheduler,
                             StatsLogger statsLogger) throws IOException {
         this.conf = conf;
         this.eventLoopGroup = eventLoopGroup;
+        this.allocator = allocator;
         this.executor = executor;
         this.closed = false;
         this.closeLock = new ReentrantReadWriteLock();
@@ -120,6 +124,8 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
                                                                     conf.getTimeoutMonitorIntervalSec(),
                                                                     conf.getTimeoutMonitorIntervalSec(),
                                                                     TimeUnit.SECONDS);
+        } else {
+            this.timeoutFuture = null;
         }
     }
 
@@ -175,7 +181,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
         if (conf.getLimitStatsLogging()) {
             statsLoggerForPCBC = NullStatsLogger.INSTANCE;
         }
-        return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLoggerForPCBC,
+        return new PerChannelBookieClient(conf, executor, eventLoopGroup, allocator, address, statsLoggerForPCBC,
                 authProviderFactory, registry, pcbcPool, shFactory);
     }
 
@@ -596,8 +602,8 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
                 .build();
         ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory("BookKeeperClientScheduler"));
-        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                                   scheduler, NullStatsLogger.INSTANCE);
+        BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                null, executor, scheduler, NullStatsLogger.INSTANCE);
         BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
 
         for (int i = 0; i < 100000; i++) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index add4537..5b5c288 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ExtensionRegistry;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
@@ -107,8 +108,11 @@ class BookieNettyServer {
     final BookieAuthProvider.Factory authProviderFactory;
     final ExtensionRegistry registry = ExtensionRegistry.newInstance();
 
-    BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
+    private final ByteBufAllocator allocator;
+
+    BookieNettyServer(ServerConfiguration conf, RequestProcessor processor, ByteBufAllocator allocator)
         throws IOException, KeeperException, InterruptedException, BookieException {
+        this.allocator = allocator;
         this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
         this.conf = conf;
         this.requestProcessor = processor;
@@ -296,7 +300,8 @@ class BookieNettyServer {
     private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
         if (!conf.isDisableServerSocketBind()) {
             ServerBootstrap bootstrap = new ServerBootstrap();
-            bootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
+            bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+            bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
             bootstrap.group(eventLoopGroup, eventLoopGroup);
             bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
             bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index b883f74..7d5e2e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -29,6 +29,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.HashedWheelTimer;
@@ -44,6 +45,7 @@ import java.util.function.Consumer;
 
 import lombok.AccessLevel;
 import lombok.Getter;
+
 import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -124,9 +126,12 @@ public class BookieRequestProcessor implements RequestProcessor {
     final Optional<Cache<Channel, Boolean>> blacklistedChannels;
     final Consumer<Channel> onResponseTimeout;
 
-    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
-            StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
+    private final ByteBufAllocator allocator;
+
+    public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
+            SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
         this.serverCfg = serverCfg;
+        this.allocator = allocator;
         this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
         this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
         this.bookie = bookie;
@@ -158,7 +163,7 @@ public class BookieRequestProcessor implements RequestProcessor {
                 OrderedExecutor.NO_TASK_LIMIT, statsLogger);
         this.shFactory = shFactory;
         if (shFactory != null) {
-            shFactory.init(NodeType.Server, serverCfg);
+            shFactory.init(NodeType.Server, serverCfg, allocator);
         }
 
         this.requestTimer = new HashedWheelTimer(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 27e8ab2..b8c1adc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -25,6 +25,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
 import static org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP_USERS;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.UnknownHostException;
@@ -38,6 +40,7 @@ import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.bookie.ReadOnlyBookie;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,7 +65,7 @@ public class BookieServer {
     final ServerConfiguration conf;
     BookieNettyServer nettyServer;
     private volatile boolean running = false;
-    Bookie bookie;
+    private final Bookie bookie;
     DeathWatcher deathWatcher;
     private static final Logger LOG = LoggerFactory.getLogger(BookieServer.class);
 
@@ -96,10 +99,11 @@ public class BookieServer {
             LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
         }
 
+        ByteBufAllocator allocator = getAllocator(conf);
         this.statsLogger = statsLogger;
-        this.nettyServer = new BookieNettyServer(this.conf, null);
+        this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
         try {
-            this.bookie = newBookie(conf);
+            this.bookie = newBookie(conf, allocator);
         } catch (IOException | KeeperException | InterruptedException | BookieException e) {
             // interrupted on constructing a bookie
             this.nettyServer.shutdown();
@@ -110,7 +114,7 @@ public class BookieServer {
         shFactory = SecurityProviderFactoryFactory
                 .getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
         this.requestProcessor = new BookieRequestProcessor(conf, bookie,
-                statsLogger.scope(SERVER_SCOPE), shFactory);
+                statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
         this.nettyServer.setRequestProcessor(this.requestProcessor);
     }
 
@@ -126,11 +130,11 @@ public class BookieServer {
         this.uncaughtExceptionHandler = exceptionHandler;
     }
 
-    protected Bookie newBookie(ServerConfiguration conf)
+    protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
         throws IOException, KeeperException, InterruptedException, BookieException {
         return conf.isForceReadOnlyBookie()
-            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE))
-            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
+            ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator)
+            : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator);
     }
 
     public void start() throws IOException, UnavailableException, InterruptedException, BKException {
@@ -285,6 +289,24 @@ public class BookieServer {
         }
     }
 
+    private ByteBufAllocator getAllocator(ServerConfiguration conf) {
+        return ByteBufAllocatorBuilder.create()
+                .poolingPolicy(conf.getAllocatorPoolingPolicy())
+                .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+                .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+                .outOfMemoryListener((ex) -> {
+                    try {
+                        LOG.error("Unable to allocate memory, exiting bookie", ex);
+                    } finally {
+                        if (uncaughtExceptionHandler != null) {
+                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), ex);
+                        }
+                    }
+                })
+                .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+                .build();
+    }
+
     /**
      * Legacy Method to run bookie server.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6839afc..d411e98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -29,7 +29,6 @@ import com.google.protobuf.UnsafeByteOperations;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
@@ -166,6 +165,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
 
     final BookieSocketAddress addr;
     final EventLoopGroup eventLoopGroup;
+    final ByteBufAllocator allocator;
     final OrderedExecutor executor;
     final long addEntryTimeoutNanos;
     final long readEntryTimeoutNanos;
@@ -345,12 +345,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool) throws SecurityException {
-       this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
+        this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, addr, NullStatsLogger.INSTANCE,
                 authProviderFactory, extRegistry, pcbcPool, null);
     }
 
     public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
-                                  EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
+                                  EventLoopGroup eventLoopGroup,
+                                  ByteBufAllocator allocator,
+                                  BookieSocketAddress addr,
                                   StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
                                   ExtensionRegistry extRegistry,
                                   PerChannelBookieClientPool pcbcPool,
@@ -364,6 +366,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         } else {
             this.eventLoopGroup = eventLoopGroup;
         }
+        this.allocator = allocator;
         this.state = ConnectionState.DISCONNECTED;
         this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
         this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
@@ -376,7 +379,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
         this.extRegistry = extRegistry;
         this.shFactory = shFactory;
         if (shFactory != null) {
-            shFactory.init(NodeType.Client, conf);
+            shFactory.init(NodeType.Client, conf, allocator);
         }
 
         StringBuilder nameBuilder = new StringBuilder();
@@ -515,14 +518,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
             bootstrap.channel(NioSocketChannel.class);
         }
 
-        ByteBufAllocator allocator;
-        if (this.conf.isNettyUsePooledBuffers()) {
-            allocator = PooledByteBufAllocator.DEFAULT;
-        } else {
-            allocator = UnpooledByteBufAllocator.DEFAULT;
-        }
-
-        bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+        bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis());
         bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                 conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index d6d1949..59dec3a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -22,6 +22,7 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 import com.scurrilous.circe.crc.Sse42Crc32C;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,10 +39,10 @@ class CRC32CDigestManager extends DigestManager {
         }
     };
 
-    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
         if (!Sse42Crc32C.isSupported()) {
-            log.error("Sse42Crc32C is not supported, will use less slower CRC32C implementation.");
+            log.error("Sse42Crc32C is not supported, will use a slower CRC32C implementation.");
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index b71ab59..d06bc80 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.proto.checksum;
 */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.concurrent.FastThreadLocal;
 
 /**
@@ -46,8 +47,8 @@ class CRC32DigestManager extends DigestManager {
         }
     };
 
-    public CRC32DigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public CRC32DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 1928637..2dabf82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -18,7 +18,7 @@
 package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
 
@@ -47,6 +47,7 @@ public abstract class DigestManager {
 
     final long ledgerId;
     final boolean useV2Protocol;
+    private final ByteBufAllocator allocator;
 
     abstract int getMacCodeLength();
 
@@ -60,28 +61,24 @@ public abstract class DigestManager {
 
     final int macCodeLength;
 
-    public DigestManager(long ledgerId, boolean useV2Protocol) {
+    public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
         this.ledgerId = ledgerId;
         this.useV2Protocol = useV2Protocol;
-        macCodeLength = getMacCodeLength();
-    }
-
-    public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType)
-            throws GeneralSecurityException {
-        return instantiate(ledgerId, passwd, digestType, false);
+        this.macCodeLength = getMacCodeLength();
+        this.allocator = allocator;
     }
 
     public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
-            boolean useV2Protocol) throws GeneralSecurityException {
+            ByteBufAllocator allocator, boolean useV2Protocol) throws GeneralSecurityException {
         switch(digestType) {
         case HMAC:
-            return new MacDigestManager(ledgerId, passwd, useV2Protocol);
+            return new MacDigestManager(ledgerId, passwd, useV2Protocol, allocator);
         case CRC32:
-            return new CRC32DigestManager(ledgerId, useV2Protocol);
+            return new CRC32DigestManager(ledgerId, useV2Protocol, allocator);
         case CRC32C:
-            return new CRC32CDigestManager(ledgerId, useV2Protocol);
+            return new CRC32CDigestManager(ledgerId, useV2Protocol, allocator);
         case DUMMY:
-            return new DummyDigestManager(ledgerId, useV2Protocol);
+            return new DummyDigestManager(ledgerId, useV2Protocol, allocator);
         default:
             throw new GeneralSecurityException("Unknown checksum type: " + digestType);
         }
@@ -106,7 +103,7 @@ public abstract class DigestManager {
             /*
              * For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
              */
-            ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+            ByteBuf headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
             headersBuffer.writeLong(ledgerId);
             headersBuffer.writeLong(entryId);
             headersBuffer.writeLong(lastAddConfirmed);
@@ -149,7 +146,7 @@ public abstract class DigestManager {
     public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
         ByteBuf headersBuffer;
         if (this.useV2Protocol) {
-            headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+            headersBuffer = allocator.buffer(LAC_METADATA_LENGTH + macCodeLength);
         } else {
             headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
         }
@@ -185,7 +182,7 @@ public abstract class DigestManager {
         int offset = METADATA_LENGTH + macCodeLength;
         update(dataReceived.slice(offset, dataReceived.readableBytes() - offset));
 
-        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        ByteBuf digest = allocator.buffer(macCodeLength);
         populateValueAndReset(digest);
 
         try {
@@ -225,7 +222,7 @@ public abstract class DigestManager {
 
         update(dataReceived.slice(0, LAC_METADATA_LENGTH));
 
-        ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+        ByteBuf digest = allocator.buffer(macCodeLength);
         try {
             populateValueAndReset(digest);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index 1b771f0..aeb0d5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -21,14 +21,15 @@
 package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 
 /**
  * This class provides a noop digest implementation.
  */
 public class DummyDigestManager extends DigestManager {
-    public DummyDigestManager(long ledgerId, boolean useV2Protocol) {
-        super(ledgerId, useV2Protocol);
+    public DummyDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+        super(ledgerId, useV2Protocol, allocator);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index e71c077..92b93f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto.checksum;
 import static com.google.common.base.Charsets.UTF_8;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.security.MessageDigest;
@@ -71,9 +72,9 @@ public class MacDigestManager extends DigestManager {
         }
     };
 
-    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol)
+    public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol, ByteBufAllocator allocator)
             throws GeneralSecurityException {
-        super(ledgerId, useV2Protocol);
+        super(ledgerId, useV2Protocol, allocator);
         this.passwd = Arrays.copyOf(passwd, passwd.length);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
index 59be884..5b43744 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.tls;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.ssl.SslHandler;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -37,7 +38,7 @@ public interface SecurityHandlerFactory {
 
     String getHandlerName();
 
-    void init(NodeType type, AbstractConfiguration conf) throws SecurityException;
+    void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator) throws SecurityException;
 
     SslHandler newTLSHandler();
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index 17aea85..32e2426 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -20,7 +20,8 @@ package org.apache.bookkeeper.tls;
 import com.google.common.base.Strings;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.netty.buffer.PooledByteBufAllocator;
+
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.OpenSsl;
 import io.netty.handler.ssl.SslContext;
@@ -79,6 +80,7 @@ public class TLSContextFactory implements SecurityHandlerFactory {
     private String[] protocols;
     private String[] ciphers;
     private SslContext sslContext;
+    private ByteBufAllocator allocator;
 
     private String getPasswordFromFile(String path) throws IOException {
         byte[] pwd;
@@ -350,7 +352,9 @@ public class TLSContextFactory implements SecurityHandlerFactory {
     }
 
     @Override
-    public synchronized void init(NodeType type, AbstractConfiguration conf) throws SecurityException {
+    public synchronized void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator)
+            throws SecurityException {
+        this.allocator = allocator;
         final String enabledProtocols;
         final String enabledCiphers;
 
@@ -397,7 +401,7 @@ public class TLSContextFactory implements SecurityHandlerFactory {
 
     @Override
     public SslHandler newTLSHandler() {
-        SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
+        SslHandler sslHandler = sslContext.newHandler(allocator);
 
         if (protocols != null && protocols.length != 0) {
             sslHandler.engine().setEnabledProtocols(protocols);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index f9e4cff..355cf3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.util;
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
@@ -307,7 +306,7 @@ public class ByteBufList extends AbstractReferenceCounted {
                     if (prependSize) {
                         // Prepend the frame size before writing the buffer list, so that we only have 1 single size
                         // header
-                        ByteBuf sizeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+                        ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
                         sizeBuffer.writeInt(b.readableBytes());
                         ctx.write(sizeBuffer, ctx.voidPromise());
                     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 530969b..7dde6fd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -34,6 +34,8 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -153,7 +155,8 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
 
         // simulating ZooKeeper exception by assigning a closed zk client to bk
         BookieServer bkServer = new BookieServer(conf) {
-            protected Bookie newBookie(ServerConfiguration conf)
+            @Override
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
                     throws IOException, KeeperException, InterruptedException,
                     BookieException {
                 Bookie bookie = new Bookie(conf);
@@ -708,7 +711,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         }
 
         @Override
-        protected Bookie newBookie(ServerConfiguration conf)
+        protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
                 throws IOException, KeeperException, InterruptedException, BookieException {
             return new MockBookieWithNoopShutdown(conf, NullStatsLogger.INSTANCE);
         }
@@ -717,7 +720,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
     class MockBookieWithNoopShutdown extends Bookie {
         public MockBookieWithNoopShutdown(ServerConfiguration conf, StatsLogger statsLogger)
                 throws IOException, KeeperException, InterruptedException, BookieException {
-            super(conf, statsLogger);
+            super(conf, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         // making Bookie Shutdown no-op. Ideally for this testcase we need to
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
index 86f3a86..c98663d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
@@ -76,8 +78,8 @@ public class BufferedChannelTest {
         newLogFile.deleteOnExit();
         FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
 
-        BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
-                INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+        BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel,
+                INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
 
         ByteBuf dataBuf = generateEntry(byteBufLength);
         dataBuf.markReaderIndex();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index bbe9c10..556556e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -265,7 +266,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
                     null,
                     cp,
                     Checkpointer.NULL,
-                    NullStatsLogger.INSTANCE);
+                    NullStatsLogger.INSTANCE,
+                    UnpooledByteBufAllocator.DEFAULT);
                 storage.start();
                 long startTime = System.currentTimeMillis();
                 storage.gcThread.enableForceGC();
@@ -616,7 +618,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         Bookie newbookie = new Bookie(newBookieConf);
 
         DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                BookKeeper.DigestType.toProtoDigestType(digestType), baseClientConf.getUseV2WireProtocol());
+                BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                baseClientConf.getUseV2WireProtocol());
 
         for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) {
             ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId);
@@ -860,7 +863,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         ledgers.add(1L);
         ledgers.add(2L);
         ledgers.add(3L);
@@ -885,7 +889,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             dirs, dirs, null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         storage.start();
         for (int i = 0; i < 10; i++) {
             if (!log0.exists()) {
@@ -910,7 +915,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
         storage.getEntry(1, 1); // entry should exist
     }
 
@@ -1021,7 +1027,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             null,
             checkpointSource,
             Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
 
         double threshold = 0.1;
         // shouldn't throw exception
@@ -1063,7 +1070,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         };
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
         storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
-            Checkpointer.NULL, NullStatsLogger.INSTANCE);
+            Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         for (long ledger = 0; ledger <= 10; ledger++) {
             ledgers.add(ledger);
@@ -1081,7 +1088,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         storage = new InterleavedLedgerStorage();
         storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
-                           Checkpointer.NULL, NullStatsLogger.INSTANCE);
+                           Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId()
             - storage.gcThread.scannedLogId;
@@ -1158,7 +1165,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             null,
             cp,
             Checkpointer.NULL,
-            stats.getStatsLogger("storage"));
+            stats.getStatsLogger("storage"),
+            UnpooledByteBufAllocator.DEFAULT);
         storage.start();
 
         int majorCompactions = stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get().intValue();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index f5d4edc..0b24db2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -606,7 +608,8 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
@@ -731,7 +734,8 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+                UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 9062261..e1f3582 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -30,6 +30,8 @@ import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -963,8 +965,8 @@ public class EntryLogTest {
         File tmpFile = File.createTempFile("entrylog", logid + "");
         tmpFile.deleteOnExit();
         FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel();
-        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(fc, 10, 10, logid, tmpFile,
-                servConf.getFlushIntervalInBytes());
+        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10,
+                logid, tmpFile, servConf.getFlushIntervalInBytes());
         return logChannel;
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 70f2a0e..909e646 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -342,7 +343,8 @@ public class IndexPersistenceMgrTest {
 
         preCreateFileInfoForLedger(ledgerId, headerVersion);
         DigestManager digestManager = DigestManager.instantiate(ledgerId, masterKey,
-                BookKeeper.DigestType.toProtoDigestType(digestType), getUseV2WireProtocol);
+                BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                getUseV2WireProtocol);
 
         CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey);
         fileInfo.readHeader();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index 4fde8e7..c12ed91 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -110,7 +112,7 @@ public class InterleavedLedgerStorageTest {
                 LedgerDirsManager ledgerDirsManager,
                 EntryLogListener listener,
                 StatsLogger statsLogger) throws IOException {
-            super(conf, ledgerDirsManager, listener, statsLogger);
+            super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 2321735..a606f9b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
@@ -522,7 +523,8 @@ public class LedgerCacheTest {
                                StateManager stateManager,
                                CheckpointSource checkpointSource,
                                Checkpointer checkpointer,
-                               StatsLogger statsLogger) throws IOException {
+                               StatsLogger statsLogger,
+                               ByteBufAllocator allocator) throws IOException {
             super.initialize(
                 conf,
                 ledgerManager,
@@ -531,7 +533,8 @@ public class LedgerCacheTest {
                 stateManager,
                 checkpointSource,
                 checkpointer,
-                statsLogger);
+                statsLogger,
+                allocator);
             if (this.memTable instanceof EntryMemTableWithParallelFlusher) {
                 this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger) {
                     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index 697d7c0..d028f70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -145,7 +147,8 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
 
         if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
             DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                    BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+                    BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                    confWithExplicitLAC.getUseV2WireProtocol());
             long explicitLacPersistedInJournal = digestManager.verifyDigestAndReturnLac(explicitLacBuf);
             assertEquals("explicitLac persisted in journal", (numOfEntries - 1), explicitLacPersistedInJournal);
         } else {
@@ -226,7 +229,8 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
 
         if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
             DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
-                    BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+                    BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+                    confWithExplicitLAC.getUseV2WireProtocol());
             long explicitLacReadFromFileInfo = digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
             assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 1), explicitLacReadFromFileInfo);
         } else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
index 645af9c..0b429ad 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.bookie;
  */
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,7 +51,7 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
 
         public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener,
                 StatsLogger statsLogger) throws IOException {
-            super(conf, ledgerDirsManager, listener, statsLogger);
+            super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
         public SlowEntryLogger setAddDelay(long delay) {
@@ -110,10 +113,11 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
                            StateManager stateManager,
                            CheckpointSource checkpointSource,
                            Checkpointer checkpointer,
-                           StatsLogger statsLogger)
+                           StatsLogger statsLogger,
+                           ByteBufAllocator allocator)
             throws IOException {
         super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                stateManager, checkpointSource, checkpointer, statsLogger);
+                stateManager, checkpointSource, checkpointer, statsLogger, allocator);
         // do not want to add these to config class, reading throw "raw" interface
         long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
         long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 44f20e6..48d1038 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -141,7 +143,8 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
             null,
             checkpointSrc,
             checkpointer,
-            NullStatsLogger.INSTANCE);
+            NullStatsLogger.INSTANCE,
+            UnpooledByteBufAllocator.DEFAULT);
     }
 
     @After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
index 8da2ffa..71658e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.File;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -152,7 +155,8 @@ public class StateManagerTest extends BookKeeperClusterTestCase {
                 .setJournalDirName(tmpDir.toString())
                 .setMetadataServiceUri(zkUtil.getMetadataServiceUri())
                 .setForceReadOnlyBookie(true);
-        ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE);
+        ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
         readOnlyBookie.start();
         assertTrue(readOnlyBookie.isRunning());
         assertTrue(readOnlyBookie.isReadOnly());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index d9fa8cc..707eb81 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -273,7 +275,8 @@ public class TestSyncThread {
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger)
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator)
                 throws IOException {
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index bfd7a4d..5c24633 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -94,7 +95,7 @@ public class ConversionRollbackTest {
 
         DbLedgerStorage dbStorage = new DbLedgerStorage();
         dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the dbStorage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -124,7 +125,7 @@ public class ConversionRollbackTest {
         // Verify that interleaved storage index has the same entries
         InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index b2afe4c..780b8ec 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -91,7 +92,7 @@ public class ConversionTest {
 
         InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the interleaved storage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -121,11 +122,12 @@ public class ConversionTest {
         // Verify that db index has the same entries
         DbLedgerStorage dbStorage = new DbLedgerStorage();
         dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
-                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         interleavedStorage = new InterleavedLedgerStorage();
         interleavedStorage.initialize(conf, null, ledgerDirsManager,
-                ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+                ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE,
+                UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 7a45ee7..e5feef3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
@@ -58,10 +59,11 @@ public class DbLedgerStorageWriteCacheTest {
         protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
                 LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
                 StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
-                StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+                StatsLogger statsLogger, ScheduledExecutorService gcExecutor,
+                long writeCacheSize, long readCacheSize)
                 throws IOException {
             return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
-                    stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize,
+                    stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
                     readCacheSize);
         }
 
@@ -69,9 +71,10 @@ public class DbLedgerStorageWriteCacheTest {
             public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
                     LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
                     CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
-                    ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+                    ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize,
+                    long readCacheSize) throws IOException {
                 super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, stateManager, checkpointSource,
-                        checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+                        checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize);
             }
 
           @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 629a238..7bdbcd5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.io.File;
 import java.io.IOException;
@@ -92,7 +93,7 @@ public class LocationsIndexRebuildTest {
 
         DbLedgerStorage ledgerStorage = new DbLedgerStorage();
         ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         // Insert some ledger & entries in the storage
         for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -122,7 +123,7 @@ public class LocationsIndexRebuildTest {
         // Verify that db index has the same entries
         ledgerStorage = new DbLedgerStorage();
         ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
-                NullStatsLogger.INSTANCE);
+                NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
 
         Set<Long> ledgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
         Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
index 42e5099..337140c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import org.junit.Test;
 
@@ -35,7 +36,7 @@ public class ReadCacheTest {
 
     @Test
     public void simple() {
-        ReadCache cache = new ReadCache(10 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -72,7 +73,7 @@ public class ReadCacheTest {
 
     @Test
     public void emptyCache() {
-        ReadCache cache = new ReadCache(10 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -84,7 +85,7 @@ public class ReadCacheTest {
     @Test
     public void multipleSegments() {
         // Test with multiple smaller segments
-        ReadCache cache = new ReadCache(10 * 1024, 2 * 1024);
+        ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024, 2 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
index f8b2bba..5726bbb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -26,9 +26,10 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.nio.charset.Charset;
 import java.util.concurrent.BrokenBarrierException;
@@ -46,11 +47,13 @@ import org.junit.Test;
  */
 public class WriteCacheTest {
 
+    private static final ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+
     @Test
     public void simple() throws Exception {
-        WriteCache cache = new WriteCache(10 * 1024);
+        WriteCache cache = new WriteCache(allocator, 10 * 1024);
 
-        ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+        ByteBuf entry1 = allocator.buffer(1024);
         ByteBufUtil.writeUtf8(entry1, "entry-1");
         entry1.writerIndex(entry1.capacity());
 
@@ -87,9 +90,9 @@ public class WriteCacheTest {
         int entrySize = 1024;
         int entriesCount = cacheSize / entrySize;
 
-        WriteCache cache = new WriteCache(cacheSize);
+        WriteCache cache = new WriteCache(allocator, cacheSize);
 
-        ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+        ByteBuf entry = allocator.buffer(entrySize);
         entry.writerIndex(entry.capacity());
 
         for (int i = 0; i < entriesCount; i++) {
@@ -125,7 +128,7 @@ public class WriteCacheTest {
     @Test
     public void testMultipleSegments() {
         // Create cache with max size 1Mb and each segment is 16Kb
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writerIndex(entry.capacity());
@@ -142,7 +145,7 @@ public class WriteCacheTest {
 
     @Test
     public void testEmptyCache() {
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         assertEquals(0, cache.count());
         assertEquals(0, cache.size());
@@ -160,7 +163,7 @@ public class WriteCacheTest {
     @Test
     public void testMultipleWriters() throws Exception {
         // Create cache with max size 1Mb and each segment is 16Kb
-        WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 10 * 1024 * 1024, 16 * 1024);
 
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -220,7 +223,7 @@ public class WriteCacheTest {
 
     @Test
     public void testLedgerDeletion() {
-        WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+        WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
 
         ByteBuf entry = Unpooled.buffer(1024);
         entry.writerIndex(entry.capacity());
@@ -265,7 +268,7 @@ public class WriteCacheTest {
     @Test
     public void testWriteReadsInMultipleSegments() {
         // Create cache with max size 4 KB and each segment is 128 bytes
-        WriteCache cache = new WriteCache(4 * 1024, 128);
+        WriteCache cache = new WriteCache(allocator, 4 * 1024, 128);
 
         for (int i = 0; i < 48; i++) {
             boolean inserted = cache.put(1, i, Unpooled.wrappedBuffer(("test-" + i).getBytes()));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 7be404d..cb45ba7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -587,6 +587,7 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testReadEntryReleaseByteBufs() throws Exception {
         ClientConfiguration confWriter = new ClientConfiguration();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index 253f0d9..e1d32af 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -45,7 +47,7 @@ public class BookKeeperTestClient extends BookKeeper {
 
     public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
             throws IOException, InterruptedException, BKException {
-        super(conf, null, null,
+        super(conf, null, null, new UnpooledByteBufAllocator(false),
               statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
               null, null, null);
         this.statsProvider = statsProvider;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 68a6846..a6b873e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.util.function.Function;
@@ -47,7 +48,8 @@ public class ClientUtil {
 
     public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data,
             int offset, int len) throws GeneralSecurityException {
-        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32);
+        DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32,
+                UnpooledByteBufAllocator.DEFAULT, true);
         return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
                 Unpooled.wrappedBuffer(data, offset, len)));
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6d6e4d1..69b5635 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -31,7 +31,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -210,6 +212,11 @@ public abstract class MockBookKeeperTestCase {
                 public boolean isClientClosed() {
                     return bk.isClosed();
                 }
+
+                @Override
+                public ByteBufAllocator getByteBufAllocator() {
+                    return UnpooledByteBufAllocator.DEFAULT;
+                }
             };
         when(bk.getClientCtx()).thenReturn(clientCtx);
         when(bk.getLedgerManager()).thenReturn(ledgerManager);
@@ -241,7 +248,8 @@ public abstract class MockBookKeeperTestCase {
                 metadata.getPassword(),
                 org.apache.bookkeeper.client.BookKeeper.DigestType.toProtoDigestType(
                         org.apache.bookkeeper.client.BookKeeper.DigestType.fromApiDigestType(
-                                metadata.getDigestType())));
+                                metadata.getDigestType())),
+                UnpooledByteBufAllocator.DEFAULT, false);
     }
 
     @After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index f36c008..f0be8d0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.util.function.BooleanSupplier;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -50,6 +53,7 @@ public class MockClientContext implements ClientContext {
     private BookKeeperClientStats clientStats;
     private BooleanSupplier isClientClosed;
     private MockRegistrationClient regClient;
+    private ByteBufAllocator allocator;
 
     static MockClientContext create() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
@@ -67,6 +71,7 @@ public class MockClientContext implements ClientContext {
             .setPlacementPolicy(placementPolicy)
             .setRegistrationClient(regClient)
             .setBookieClient(new MockBookieClient(scheduler))
+            .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
             .setMainWorkerPool(scheduler)
             .setScheduler(scheduler)
             .setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
@@ -83,6 +88,7 @@ public class MockClientContext implements ClientContext {
             .setMainWorkerPool(other.getMainWorkerPool())
             .setScheduler(other.getScheduler())
             .setClientStats(other.getClientStats())
+            .setByteBufAllocator(other.getByteBufAllocator())
             .setIsClientClosed(other::isClientClosed);
     }
 
@@ -151,6 +157,11 @@ public class MockClientContext implements ClientContext {
         return this;
     }
 
+    public MockClientContext setByteBufAllocator(ByteBufAllocator allocator) {
+        this.allocator = allocator;
+        return this;
+    }
+
     private static <T> T maybeSpy(T orig) {
         if (Mockito.mockingDetails(orig).isSpy()) {
             return orig;
@@ -204,4 +215,8 @@ public class MockClientContext implements ClientContext {
         return isClientClosed.getAsBoolean();
     }
 
+    @Override
+    public ByteBufAllocator getByteBufAllocator() {
+        return allocator;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index a1032a1..e1c203d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -132,7 +134,7 @@ public class ReadLastConfirmedAndEntryOpTest {
         when(mockLh.getCurrentEnsemble()).thenReturn(ensemble);
         when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
         when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
-        digestManager = new DummyDigestManager(LEDGERID, false);
+        digestManager = new DummyDigestManager(LEDGERID, false, UnpooledByteBufAllocator.DEFAULT);
         when(mockLh.getDigestManager()).thenReturn(digestManager);
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index 4ec5992..39c615a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 
 import static org.junit.Assert.assertTrue;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -108,7 +109,8 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
         // try to get bookie info from the sleeping bookie. It should fail with timeout error
         BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
                 bookieToSleep.getPort());
-        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, executor,
+                scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
index 9073b01..f993e20 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Enumeration;
 
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public class TestBKConfiguration {
         confReturn.setGcWaitTime(1000);
         confReturn.setDiskUsageThreshold(0.999f);
         confReturn.setDiskUsageWarnThreshold(0.99f);
+        confReturn.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         confReturn.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
         confReturn.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
         setLoopbackInterfaceAndAllowLoopback(confReturn);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 828956d..31bd406 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,6 +50,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -566,7 +569,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger) throws IOException {
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 3aa2b8a..cf9b3dd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,6 +22,8 @@
 package org.apache.bookkeeper.meta;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -173,7 +176,8 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
             StateManager stateManager,
             CheckpointSource checkpointSource,
             Checkpointer checkpointer,
-            StatsLogger statsLogger) throws IOException {
+            StatsLogger statsLogger,
+            ByteBufAllocator allocator) throws IOException {
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
index 5454a70..4b13ebe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.lang.reflect.Field;
 import java.nio.channels.FileChannel;
 import java.util.Enumeration;
@@ -130,7 +132,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
         for (int i = 0; i < journals.size(); i++) {
             Journal mock = spy(journals.get(i));
             when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) ->  {
-                SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+                SlowBufferedChannel sbc = new SlowBufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
                 sbc.setAddDelay(addDelay);
                 sbc.setGetDelay(getDelay);
                 sbc.setFlushDelay(flushDelay);
@@ -306,7 +308,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
@@ -347,7 +349,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
@@ -392,7 +394,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
         BookieServer bks = bs.get(bkId);
         bks.shutdown();
         bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
         bks.start();
         bs.set(bkId, bks);
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 04119d5..2c349a0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -24,6 +24,7 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.util.Collections;
 import java.util.EnumSet;
@@ -109,7 +110,8 @@ public class MockBookieClient implements BookieClient {
     }
 
     public void seedEntries(BookieSocketAddress bookie, long ledgerId, long entryId, long lac) throws Exception {
-        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C);
+        DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C,
+                UnpooledByteBufAllocator.DEFAULT, false);
         ByteBuf entry = ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
                                                      entryId, lac, 0, Unpooled.buffer(10)));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 4ebe01c..4cf8a7c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -29,6 +29,9 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import com.google.protobuf.ByteString;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -54,7 +57,7 @@ public class TestBookieRequestProcessor {
         // long poll threads == read threads
         ServerConfiguration conf = new ServerConfiguration();
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
         }
 
@@ -62,7 +65,7 @@ public class TestBookieRequestProcessor {
         conf = new ServerConfiguration();
         conf.setNumReadWorkerThreads(0);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
         }
@@ -72,7 +75,7 @@ public class TestBookieRequestProcessor {
         conf.setNumReadWorkerThreads(2);
         conf.setNumLongPollWorkerThreads(2);
         try (BookieRequestProcessor processor = new BookieRequestProcessor(
-            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+            conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
             assertNotNull(processor.getReadThreadPool());
             assertNotNull(processor.getLongPollThreadPool());
             assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index b1feef0..97b7488 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,10 +21,14 @@
 
 package org.apache.bookkeeper.test;
 
+
+
 import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBufAllocator;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -41,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -237,6 +242,7 @@ public abstract class BookKeeperClusterTestCase {
     protected void startBKCluster(String metadataServiceUri) throws Exception {
         baseConf.setMetadataServiceUri(metadataServiceUri);
         baseClientConf.setMetadataServiceUri(metadataServiceUri);
+        baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         if (numBookies > 0) {
             bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
@@ -303,6 +309,7 @@ public abstract class BookKeeperClusterTestCase {
         }
         conf.setLedgerDirNames(ledgerDirNames);
         conf.setEnableTaskExecutionStats(true);
+        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         return conf;
     }
 
@@ -677,7 +684,7 @@ public abstract class BookKeeperClusterTestCase {
         TestStatsProvider provider = new TestStatsProvider();
         BookieServer server = new BookieServer(conf, provider.getStatsLogger("")) {
             @Override
-            protected Bookie newBookie(ServerConfiguration conf) {
+            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator) {
                 return b;
             }
         };
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index c6cc72b..d38a178 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -157,8 +158,8 @@ public class BookieClientTest {
         BookieSocketAddress addr = bs.getLocalAddress();
         ResultStruct arc = new ResultStruct();
 
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         ByteBufList bb = createByteBuffer(1, 1, 1);
         bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
         synchronized (arc) {
@@ -258,8 +259,8 @@ public class BookieClientTest {
     public void testNoLedger() throws Exception {
         ResultStruct arc = new ResultStruct();
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         synchronized (arc) {
             bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
             arc.wait(1000);
@@ -270,8 +271,8 @@ public class BookieClientTest {
     @Test
     public void testGetBookieInfo() throws IOException, InterruptedException {
         BookieSocketAddress addr = bs.getLocalAddress();
-        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
-                                               scheduler, NullStatsLogger.INSTANCE);
+        BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(),
+                UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
         long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
                 | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
 
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 23012dc..41798a6 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -965,3 +965,53 @@ storage.serve.readonly.tables=false
 
 # the cluster controller schedule interval, in milliseconds. default is 30 seconds.
 storage.cluster.controller.schedule.interval.ms=30000
+
+
+#############################################################################
+## Netty Allocator Settings
+#############################################################################
+
+# Define the memory pooling policy.
+# Available options are:
+#   - PooledDirect: Use Direct memory for all buffers and pool the memory.
+#                   Direct memory will avoid the overhead of JVM GC and most
+#                   memory copies when reading and writing to socket channel.
+#                   Pooling will add memory space overhead due to the fact that
+#                   there will be fragmentation in the allocator and that threads
+#                   will keep a portion of memory as thread-local to avoid
+#                   contention when possible.
+#   - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+#                   This option has the least overhead in terms of memory usage
+#                   since the memory will be automatically reclaimed by the
+#                   JVM GC but might impose a performance penalty at high
+#                   throughput.
+# Default is: PooledDirect
+# allocatorPoolingPolicy=PooledDirect
+  
+# Controls the amount of concurrency for the memory pool.
+# Default is to have a number of allocator arenas equals to 2 * CPUS.
+# Decreasing this number will reduce the amount of memory overhead, at the
+# expense of increased allocation contention.
+# allocatorPoolingConcurrency=8
+
+# Define the memory allocator out of memory policy.
+# Available options are: 
+#   - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+#                     fallback to allocate an unpooled buffer from JVM heap.
+#                     This will help absorb memory allocation spikes because the heap
+#                     allocations will naturally slow down the process and will result
+#                     if full GC cleanup if the Heap itself is full.
+#   - ThrowException: Throw regular OOM exception without taking addition actions.
+# Default is: FallbackToHeap
+# allocatorOutOfMemoryPolicy=FallbackToHeap
+
+# Available options are:
+#   - Disabled: No leak detection and no overhead.
+#   - Simple: Instruments 1% of the allocated buffer to track for leaks.
+#   - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+#               stack traces of places where the buffer was used.
+#   - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+#               stack traces of places where the buffer was used. Introduce very
+#               significant overhead.
+# Default is: Disabled
+# allocatorLeakDetectionPolicy=Disabled
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 3995ea8..2319b2e 100644
--- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.proto.checksum;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -106,13 +107,13 @@ public class DigestTypeBenchmark {
         public void doSetup() throws Exception {
             final byte[] password = "password".getBytes("UTF-8");
             crc32 = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.CRC32);
+                    password, DigestType.CRC32, PooledByteBufAllocator.DEFAULT, true);
 
             crc32c = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.CRC32C);
+                    password, DigestType.CRC32C, PooledByteBufAllocator.DEFAULT, true);
 
             mac = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
-                    password, DigestType.HMAC);
+                    password, DigestType.HMAC, PooledByteBufAllocator.DEFAULT, true);
 
             digestBuf = Unpooled.buffer(getDigestManager(digest).getMacCodeLength());
 
diff --git a/shaded/bookkeeper-server-shaded/pom.xml b/shaded/bookkeeper-server-shaded/pom.xml
index a39227b..85c76be 100644
--- a/shaded/bookkeeper-server-shaded/pom.xml
+++ b/shaded/bookkeeper-server-shaded/pom.xml
@@ -66,6 +66,7 @@
                   <include>com.google.guava:guava</include>
                   <include>com.google.protobuf:protobuf-java</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -83,7 +84,7 @@
             </configuration>
           </execution>
         </executions>
-      </plugin> 
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>license-maven-plugin</artifactId>
diff --git a/shaded/distributedlog-core-shaded/pom.xml b/shaded/distributedlog-core-shaded/pom.xml
index e5ef59f..85ad8f0 100644
--- a/shaded/distributedlog-core-shaded/pom.xml
+++ b/shaded/distributedlog-core-shaded/pom.xml
@@ -87,6 +87,7 @@
                   <include>net.java.dev.jna:jna</include>
                   <include>net.jpountz.lz4:lz4</include>
                   <include>org.apache.bookkeeper:bookkeeper-common</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>org.apache.bookkeeper:cpu-affinity</include>
                   <include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
                   <include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -207,7 +208,7 @@
             </configuration>
           </execution>
         </executions>
-      </plugin> 
+      </plugin>
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>license-maven-plugin</artifactId>
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index a42e12e..d83a8f1 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -137,7 +137,7 @@ groups:
     default: 8080
 
 - name: Security settings
-  params: 
+  params:
   - param: bookieAuthProviderFactoryClass
     description: The bookie authentication provider factory class name. If this is null, no authentication will take place.
     default: null
@@ -207,7 +207,7 @@ groups:
        5: expanding header to 512 and padding writes to align sector size configured by `journalAlignmentSize`
        6: persisting explicitLac is introduced
 
-      By default, it is `6`. 
+      By default, it is `6`.
       If you'd like to disable persisting ExplicitLac, you can set this config to < `6` and also fileInfoFormatVersionToWrite should be set to 0. If there is mismatch then the serverconfig is considered invalid.
       You can disable `padding-writes` by setting journal version back to `4`. This feature is available in 4.5.0 and onward versions.
     default: 6
@@ -618,7 +618,7 @@ groups:
   - param: ensemblePlacementPolicy
     description: |
       The ensemble placement policy used for finding bookie for re-replicating entries.
-    
+
       Options:
         - org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
         - org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
@@ -678,4 +678,55 @@ groups:
     description: The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
     default: 5000
 
-
+- name: Memory allocator settings
+  params:
+  - param: allocatorPoolingPolicy
+    description: |
+      Define the memory pooling policy.
+
+      Available options are:
+        - PooledDirect: Use Direct memory for all buffers and pool the memory.
+                        Direct memory will avoid the overhead of JVM GC and most
+                        memory copies when reading and writing to socket channel.
+                        Pooling will add memory space overhead due to the fact that
+                        there will be fragmentation in the allocator and that threads
+                        will keep a portion of memory as thread-local to avoid
+                        contention when possible.
+        - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+                        This option has the least overhead in terms of memory usage
+                        since the memory will be automatically reclaimed by the
+                        JVM GC but might impose a performance penalty at high
+                        throughput.
+    default: PooledDirect
+  - param: allocatorPoolingConcurrency
+    description: |
+      Controls the amount of concurrency for the memory pool.
+      Default is to have a number of allocator arenas equals to 2 * CPUS.
+      Decreasing this number will reduce the amount of memory overhead, at the
+      expense of increased allocation contention.
+    default: 2 * CPUS
+  - param: allocatorOutOfMemoryPolicy
+    description: |
+      Define the memory allocator out of memory policy.
+
+      Available options are:
+        - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+                          fallback to allocate an unpooled buffer from JVM heap.
+                          This will help absorb memory allocation spikes because the heap
+                          allocations will naturally slow down the process and will result
+                          if full GC cleanup if the Heap itself is full.
+        - ThrowException: Throw regular OOM exception without taking addition actions.
+    default: FallbackToHeap
+  - param: allocatorLeakDetectionPolicy
+    description: |
+      Define the memory allocator leak detection policy.
+
+      Available options are:
+        - Disabled: No leak detection and no overhead.
+        - Simple: Instruments 1% of the allocated buffer to track for leaks.
+        - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+                    stack traces of places where the buffer was used.
+        - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+                    stack traces of places where the buffer was used. Introduce very
+                    significant overhead.
+    default: Disabled