You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:55 UTC

[bookkeeper] 31/31: Unify ByteBufAllocator for the DirectIO component (#3985)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 306eaef60df9ae43ea9e2db4ed769cb3e823e71b
Author: Hang Chen <ch...@apache.org>
AuthorDate: Mon Jun 19 15:08:31 2023 +0800

    Unify ByteBufAllocator for the DirectIO component (#3985)
    
    ### Motivation
    Some classes in the DirectIO component use `PooledByteBufAllocator` to create ByteBuf instead of using the BookKeeper-initiated allocator. When we configured the BookKeeper allocator OOM policy to shut down, the OOM policy can't apply to those classes which use `PooledByteBufAllocator` to create ByteBuf.
    
    ### Modifications
    - Unify ByteBufAllocator for the DirectIO component.
    
    (cherry picked from commit bf06642c820f84978aaaf8e162d7a6e0a992c402)
---
 .../bookie/storage/directentrylogger/Buffer.java   |  8 +++++---
 .../storage/directentrylogger/BufferPool.java      |  5 +++--
 .../DirectCompactionEntryLog.java                  |  2 +-
 .../directentrylogger/DirectEntryLogger.java       |  4 ++--
 .../storage/directentrylogger/DirectReader.java    |  2 +-
 .../storage/directentrylogger/LogReaderScan.java   |  6 +++---
 .../storage/directentrylogger/TestBuffer.java      | 17 ++++++++--------
 .../directentrylogger/TestDirectReader.java        | 15 +++++++-------
 .../directentrylogger/TestDirectWriter.java        | 23 +++++++++++-----------
 .../storage/directentrylogger/TestMetadata.java    |  2 +-
 10 files changed, 45 insertions(+), 39 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
index 2af6a17eb0..1b3a393ad0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
@@ -24,7 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -60,12 +60,14 @@ class Buffer {
     final int bufferSize;
     ByteBuf buffer;
     ByteBuffer byteBuffer;
+    ByteBufAllocator allocator;
     long pointer = 0;
 
-    Buffer(NativeIO nativeIO, int bufferSize) throws IOException {
+    Buffer(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize) throws IOException {
         checkArgument(isAligned(bufferSize),
                       "Buffer size not aligned %d", bufferSize);
 
+        this.allocator = allocator;
         this.buffer = allocateAligned(ALIGNMENT, bufferSize);
         this.nativeIO = nativeIO;
         this.bufferSize = bufferSize;
@@ -74,7 +76,7 @@ class Buffer {
     }
 
     private ByteBuf allocateAligned(int alignment, int bufferSize) {
-        ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment);
+        ByteBuf buf = allocator.directBuffer(bufferSize + alignment);
         long addr = buf.memoryAddress();
         if ((addr & (alignment - 1)) == 0) {
             // The address is already aligned
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
index 669e132d5a..3614c65c0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.bookie.storage.directentrylogger;
 
+import io.netty.buffer.ByteBufAllocator;
 import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import org.apache.bookkeeper.common.util.nativeio.NativeIO;
@@ -30,10 +31,10 @@ import org.apache.bookkeeper.common.util.nativeio.NativeIO;
 public class BufferPool implements AutoCloseable {
     private final ArrayBlockingQueue<Buffer> pool;
 
-    BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException {
+    BufferPool(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize, int maxPoolSize) throws IOException {
         pool = new ArrayBlockingQueue<>(maxPoolSize);
         for (int i = 0; i < maxPoolSize; i++) {
-            pool.add(new Buffer(nativeIO, bufferSize));
+            pool.add(new Buffer(nativeIO, allocator, bufferSize));
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
index e6403f2214..fadde648de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
@@ -207,7 +207,7 @@ public abstract class DirectCompactionEntryLog implements CompactionEntryLog {
         public void scan(EntryLogScanner scanner) throws IOException {
             try (LogReader reader = new DirectReader(dstLogId, compactedFile.toString(), allocator, nativeIO,
                                                      readBufferSize, maxSaneEntrySize, readBlockStats)) {
-                LogReaderScan.scan(reader, scanner);
+                LogReaderScan.scan(allocator, reader, scanner);
             }
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
index 3b211b54e8..f5b74d1265 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
@@ -119,7 +119,7 @@ public class DirectEntryLogger implements EntryLogger {
         this.allocator = allocator;
 
         int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
-        this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
+        this.writeBuffers = new BufferPool(nativeIO, allocator, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
 
         // The total read buffer memory needs to get split across all the read threads, since the caches
         // are thread-specific and we want to ensure we don't pass the total memory limit.
@@ -385,7 +385,7 @@ public class DirectEntryLogger implements EntryLogger {
     public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
         checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
         try (LogReader reader = newDirectReader((int) entryLogId)) {
-            LogReaderScan.scan(reader, scanner);
+            LogReaderScan.scan(allocator, reader, scanner);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
index f5076f3484..707bf307c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
@@ -69,7 +69,7 @@ class DirectReader implements LogReader {
                                   .kv("errno", ne.getErrno()).toString());
         }
         refreshMaxOffset();
-        nativeBuffer = new Buffer(nativeIO, bufferSize);
+        nativeBuffer = new Buffer(nativeIO, allocator, bufferSize);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
index fdc9a0662f..9718795143 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
@@ -21,16 +21,16 @@
 package org.apache.bookkeeper.bookie.storage.directentrylogger;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
 
 class LogReaderScan {
-    static void scan(LogReader reader, EntryLogScanner scanner) throws IOException {
+    static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException {
         int offset = Header.LOGFILE_LEGACY_HEADER_SIZE;
 
-        ByteBuf entry = PooledByteBufAllocator.DEFAULT.directBuffer(16 * 1024 * 1024);
+        ByteBuf entry = allocator.directBuffer(16 * 1024 * 1024);
 
         try {
             while (offset < reader.maxOffset()) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
index 6d9b4957e9..a4d9554166 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 
 // CHECKSTYLE.OFF: IllegalImport
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.internal.PlatformDependent;
 import java.io.IOException;
@@ -70,13 +71,13 @@ public class TestBuffer {
 
     @Test(expected = IllegalArgumentException.class)
     public void testCreateUnaligned() throws Exception {
-        new Buffer(new NativeIOImpl(), 1234);
+        new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1234);
     }
 
     @Test
     public void testWriteInt() throws Exception {
         int bufferSize = 1 << 20;
-        Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
         assertTrue(b.hasSpace(bufferSize));
         assertEquals(0, b.position());
         b.writeInt(0xdeadbeef);
@@ -111,7 +112,7 @@ public class TestBuffer {
         ByteBuf bb = Unpooled.buffer(1021);
         fillByteBuf(bb, 0xdeadbeef);
         int bufferSize = 1 << 20;
-        Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
         assertEquals(0, b.position());
         b.writeByteBuf(bb);
         assertEquals(1021, b.position());
@@ -138,7 +139,7 @@ public class TestBuffer {
     public void testPartialRead() throws Exception {
         ByteBuf bb = Unpooled.buffer(5000);
 
-        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
         for (int i = 0; i < 4096 / Integer.BYTES; i++) {
             b.writeInt(0xdeadbeef);
         }
@@ -149,7 +150,7 @@ public class TestBuffer {
 
     @Test(expected = IOException.class)
     public void testReadIntAtBoundary() throws Exception {
-        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
 
         for (int i = 0; i < 4096 / Integer.BYTES; i++) {
             b.writeInt(0xdeadbeef);
@@ -163,7 +164,7 @@ public class TestBuffer {
 
     @Test(expected = IOException.class)
     public void testReadLongAtBoundary() throws Exception {
-        Buffer b = new Buffer(new NativeIOImpl(), 4096);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
 
         for (int i = 0; i < 4096 / Integer.BYTES; i++) {
             b.writeInt(0xdeadbeef);
@@ -177,7 +178,7 @@ public class TestBuffer {
 
     @Test
     public void testPadToAlignment() throws Exception {
-        Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);
 
         for (int i = 0; i < 1025; i++) {
             b.writeInt(0xdededede);
@@ -194,7 +195,7 @@ public class TestBuffer {
 
     @Test
     public void testFree() throws Exception {
-        Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+        Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);
         b.free(); // success if process doesn't explode
         b.free();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
index 7ebf2709b9..75ad4437e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
@@ -189,7 +189,7 @@ public class TestDirectReader {
         File ledgerDir = tmpDirs.createNew("readBuffer", "logs");
 
         writeFileWithPattern(ledgerDir, 1234, 0xbeefcafe, 1, 1 << 20);
-        BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 4, 8);
+        BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 4, 8);
 
         try (LogReader reader = new DirectReader(1234, logFilename(ledgerDir, 1234),
                                                  ByteBufAllocator.DEFAULT,
@@ -268,7 +268,7 @@ public class TestDirectReader {
 
         int entrySize = Buffer.ALIGNMENT / 4 + 100;
         Map<Integer, Integer> offset2Pattern = new HashMap<>();
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
                                                  1 << 20, MoreExecutors.newDirectExecutorService(),
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -315,7 +315,7 @@ public class TestDirectReader {
                     return 0;
                 }
             };
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
                                                  1 << 20, MoreExecutors.newDirectExecutorService(),
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE);
@@ -353,7 +353,7 @@ public class TestDirectReader {
 
         int entrySize = Buffer.ALIGNMENT / 2 + 8;
 
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
                                                  1 << 20, MoreExecutors.newDirectExecutorService(),
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE);
@@ -403,7 +403,8 @@ public class TestDirectReader {
                     return 0; // don't preallocate
                 }
             };
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 10, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(),
+            ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 10, 8);
              LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
                                                  MoreExecutors.newDirectExecutorService(),
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -452,7 +453,7 @@ public class TestDirectReader {
         int entrySize = Buffer.ALIGNMENT * 4;
 
         int offset1, offset2;
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 8, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 8, 8);
              LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
                                                  MoreExecutors.newDirectExecutorService(), buffers, new NativeIOImpl(),
                                                  Slogger.CONSOLE)) {
@@ -496,7 +497,7 @@ public class TestDirectReader {
 
     private static void writeFileWithPattern(File directory, int logId,
                                              int pattern, int blockIncrement, int fileSize) throws Exception {
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(logId, logFilename(directory, logId),
                                                  fileSize, MoreExecutors.newDirectExecutorService(),
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
index b2868c907d..d897414774 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import java.io.File;
 import java.io.FileInputStream;
@@ -61,7 +62,7 @@ public class TestDirectWriter {
     @Test(expected = IllegalArgumentException.class)
     public void testWriteAtAlignment() throws Exception {
         File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678),
                                                  1 << 24, writeExecutor,
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -75,7 +76,7 @@ public class TestDirectWriter {
     @Test(expected = IllegalArgumentException.class)
     public void testWriteAlignmentSize() throws Exception {
         File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
             ByteBuf bb = Unpooled.buffer(123);
@@ -88,7 +89,7 @@ public class TestDirectWriter {
     @Test
     public void testWriteAlignedNotAtStart() throws Exception {
         File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
             ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -102,7 +103,7 @@ public class TestDirectWriter {
     @Test(timeout = 10000)
     public void testFlushingWillWaitForBuffer() throws Exception {
         File ledgerDir = tmpDirs.createNew("writeFailFailsFlush", "logs");
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(),
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT,
                 Buffer.ALIGNMENT, 1); // only one buffer available, so we can't flush in bg
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -129,7 +130,7 @@ public class TestDirectWriter {
                     return super.pwrite(fd, pointer, count, offset);
                 }
             };
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, io, Slogger.CONSOLE)) {
             for (int i = 0; i < 10; i++) {
@@ -158,7 +159,7 @@ public class TestDirectWriter {
                 }
             };
 
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, io, Slogger.CONSOLE)) {
             ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -171,7 +172,7 @@ public class TestDirectWriter {
     @Test
     public void testWriteWithPadding() throws Exception {
         File ledgerDir = tmpDirs.createNew("paddingWrite", "logs");
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
              LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
                                                  buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
             ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -199,7 +200,7 @@ public class TestDirectWriter {
         ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
         try {
             File ledgerDir = tmpDirs.createNew("blockWrite", "logs");
-            try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+            try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
                  LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
                                                      1 << 24, writeExecutor,
                                                      buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -239,7 +240,7 @@ public class TestDirectWriter {
         File ledgerDir = tmpDirs.createNew("failOpen", "logs");
         ledgerDir.delete();
 
-        BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+        BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
         try {
             new DirectWriter(1234, logFilename(ledgerDir, 1234),
                     1 << 30, MoreExecutors.newDirectExecutorService(),
@@ -259,7 +260,7 @@ public class TestDirectWriter {
                     throw new NativeIOException("pretending I'm a mac");
                 }
             };
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
              LogWriter writer = new DirectWriter(3456, logFilename(ledgerDir, 3456),
                                                  1 << 24, writeExecutor,
                                                  buffers, nativeIO, Slogger.CONSOLE)) {
@@ -282,7 +283,7 @@ public class TestDirectWriter {
     public void testWriteAtIntLimit() throws Exception {
         File ledgerDir = tmpDirs.createNew("intLimit", "logs");
 
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
              LogWriter writer = new DirectWriter(3456, logFilename(ledgerDir, 3456),
                                                  (long) Integer.MAX_VALUE + (Buffer.ALIGNMENT * 100),
                                                  writeExecutor,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
index 953e69b591..4aafd18ba4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
@@ -55,7 +55,7 @@ public class TestMetadata {
     public void testReadMetaFromHeader() throws Exception {
         File ledgerDir = tmpDirs.createNew("writeMetadataBeforeFsync", "logs");
         int logId = 5678;
-        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+        try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
              LogWriter writer = new DirectWriter(logId, logFilename(ledgerDir, logId),
                      1 << 24, writeExecutor,
                      buffers, new NativeIOImpl(), Slogger.CONSOLE)) {