You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/06/19 07:42:55 UTC
[bookkeeper] 31/31: Unify ByteBufAllocator for the DirectIO component (#3985)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 306eaef60df9ae43ea9e2db4ed769cb3e823e71b
Author: Hang Chen <ch...@apache.org>
AuthorDate: Mon Jun 19 15:08:31 2023 +0800
Unify ByteBufAllocator for the DirectIO component (#3985)
### Motivation
Some classes in the DirectIO component use `PooledByteBufAllocator` to create ByteBuf instead of using the BookKeeper-initiated allocator. When we configured the BookKeeper allocator OOM policy to shut down, the OOM policy can't apply to those classes which use `PooledByteBufAllocator` to create ByteBuf.
### Modifications
- Unify ByteBufAllocator for the DirectIO component.
(cherry picked from commit bf06642c820f84978aaaf8e162d7a6e0a992c402)
---
.../bookie/storage/directentrylogger/Buffer.java | 8 +++++---
.../storage/directentrylogger/BufferPool.java | 5 +++--
.../DirectCompactionEntryLog.java | 2 +-
.../directentrylogger/DirectEntryLogger.java | 4 ++--
.../storage/directentrylogger/DirectReader.java | 2 +-
.../storage/directentrylogger/LogReaderScan.java | 6 +++---
.../storage/directentrylogger/TestBuffer.java | 17 ++++++++--------
.../directentrylogger/TestDirectReader.java | 15 +++++++-------
.../directentrylogger/TestDirectWriter.java | 23 +++++++++++-----------
.../storage/directentrylogger/TestMetadata.java | 2 +-
10 files changed, 45 insertions(+), 39 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
index 2af6a17eb0..1b3a393ad0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/Buffer.java
@@ -24,7 +24,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.common.util.ExceptionMessageHelper.exMsg;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -60,12 +60,14 @@ class Buffer {
final int bufferSize;
ByteBuf buffer;
ByteBuffer byteBuffer;
+ ByteBufAllocator allocator;
long pointer = 0;
- Buffer(NativeIO nativeIO, int bufferSize) throws IOException {
+ Buffer(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize) throws IOException {
checkArgument(isAligned(bufferSize),
"Buffer size not aligned %d", bufferSize);
+ this.allocator = allocator;
this.buffer = allocateAligned(ALIGNMENT, bufferSize);
this.nativeIO = nativeIO;
this.bufferSize = bufferSize;
@@ -74,7 +76,7 @@ class Buffer {
}
private ByteBuf allocateAligned(int alignment, int bufferSize) {
- ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize + alignment);
+ ByteBuf buf = allocator.directBuffer(bufferSize + alignment);
long addr = buf.memoryAddress();
if ((addr & (alignment - 1)) == 0) {
// The address is already aligned
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
index 669e132d5a..3614c65c0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/BufferPool.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.bookie.storage.directentrylogger;
+import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.bookkeeper.common.util.nativeio.NativeIO;
@@ -30,10 +31,10 @@ import org.apache.bookkeeper.common.util.nativeio.NativeIO;
public class BufferPool implements AutoCloseable {
private final ArrayBlockingQueue<Buffer> pool;
- BufferPool(NativeIO nativeIO, int bufferSize, int maxPoolSize) throws IOException {
+ BufferPool(NativeIO nativeIO, ByteBufAllocator allocator, int bufferSize, int maxPoolSize) throws IOException {
pool = new ArrayBlockingQueue<>(maxPoolSize);
for (int i = 0; i < maxPoolSize; i++) {
- pool.add(new Buffer(nativeIO, bufferSize));
+ pool.add(new Buffer(nativeIO, allocator, bufferSize));
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
index e6403f2214..fadde648de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectCompactionEntryLog.java
@@ -207,7 +207,7 @@ public abstract class DirectCompactionEntryLog implements CompactionEntryLog {
public void scan(EntryLogScanner scanner) throws IOException {
try (LogReader reader = new DirectReader(dstLogId, compactedFile.toString(), allocator, nativeIO,
readBufferSize, maxSaneEntrySize, readBlockStats)) {
- LogReaderScan.scan(reader, scanner);
+ LogReaderScan.scan(allocator, reader, scanner);
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
index 3b211b54e8..f5b74d1265 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectEntryLogger.java
@@ -119,7 +119,7 @@ public class DirectEntryLogger implements EntryLogger {
this.allocator = allocator;
int singleWriteBufferSize = Buffer.nextAlignment((int) (totalWriteBufferSize / NUMBER_OF_WRITE_BUFFERS));
- this.writeBuffers = new BufferPool(nativeIO, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
+ this.writeBuffers = new BufferPool(nativeIO, allocator, singleWriteBufferSize, NUMBER_OF_WRITE_BUFFERS);
// The total read buffer memory needs to get split across all the read threads, since the caches
// are thread-specific and we want to ensure we don't pass the total memory limit.
@@ -385,7 +385,7 @@ public class DirectEntryLogger implements EntryLogger {
public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
checkArgument(entryLogId < Integer.MAX_VALUE, "Entry log id must be an int [%d]", entryLogId);
try (LogReader reader = newDirectReader((int) entryLogId)) {
- LogReaderScan.scan(reader, scanner);
+ LogReaderScan.scan(allocator, reader, scanner);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
index f5076f3484..707bf307c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/DirectReader.java
@@ -69,7 +69,7 @@ class DirectReader implements LogReader {
.kv("errno", ne.getErrno()).toString());
}
refreshMaxOffset();
- nativeBuffer = new Buffer(nativeIO, bufferSize);
+ nativeBuffer = new Buffer(nativeIO, allocator, bufferSize);
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
index fdc9a0662f..9718795143 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/directentrylogger/LogReaderScan.java
@@ -21,16 +21,16 @@
package org.apache.bookkeeper.bookie.storage.directentrylogger;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
class LogReaderScan {
- static void scan(LogReader reader, EntryLogScanner scanner) throws IOException {
+ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException {
int offset = Header.LOGFILE_LEGACY_HEADER_SIZE;
- ByteBuf entry = PooledByteBufAllocator.DEFAULT.directBuffer(16 * 1024 * 1024);
+ ByteBuf entry = allocator.directBuffer(16 * 1024 * 1024);
try {
while (offset < reader.maxOffset()) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
index 6d9b4957e9..a4d9554166 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestBuffer.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
// CHECKSTYLE.OFF: IllegalImport
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
@@ -70,13 +71,13 @@ public class TestBuffer {
@Test(expected = IllegalArgumentException.class)
public void testCreateUnaligned() throws Exception {
- new Buffer(new NativeIOImpl(), 1234);
+ new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1234);
}
@Test
public void testWriteInt() throws Exception {
int bufferSize = 1 << 20;
- Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
assertTrue(b.hasSpace(bufferSize));
assertEquals(0, b.position());
b.writeInt(0xdeadbeef);
@@ -111,7 +112,7 @@ public class TestBuffer {
ByteBuf bb = Unpooled.buffer(1021);
fillByteBuf(bb, 0xdeadbeef);
int bufferSize = 1 << 20;
- Buffer b = new Buffer(new NativeIOImpl(), bufferSize);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, bufferSize);
assertEquals(0, b.position());
b.writeByteBuf(bb);
assertEquals(1021, b.position());
@@ -138,7 +139,7 @@ public class TestBuffer {
public void testPartialRead() throws Exception {
ByteBuf bb = Unpooled.buffer(5000);
- Buffer b = new Buffer(new NativeIOImpl(), 4096);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
}
@@ -149,7 +150,7 @@ public class TestBuffer {
@Test(expected = IOException.class)
public void testReadIntAtBoundary() throws Exception {
- Buffer b = new Buffer(new NativeIOImpl(), 4096);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
@@ -163,7 +164,7 @@ public class TestBuffer {
@Test(expected = IOException.class)
public void testReadLongAtBoundary() throws Exception {
- Buffer b = new Buffer(new NativeIOImpl(), 4096);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 4096);
for (int i = 0; i < 4096 / Integer.BYTES; i++) {
b.writeInt(0xdeadbeef);
@@ -177,7 +178,7 @@ public class TestBuffer {
@Test
public void testPadToAlignment() throws Exception {
- Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);
for (int i = 0; i < 1025; i++) {
b.writeInt(0xdededede);
@@ -194,7 +195,7 @@ public class TestBuffer {
@Test
public void testFree() throws Exception {
- Buffer b = new Buffer(new NativeIOImpl(), 1 << 23);
+ Buffer b = new Buffer(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 23);
b.free(); // success if process doesn't explode
b.free();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
index 7ebf2709b9..75ad4437e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectReader.java
@@ -189,7 +189,7 @@ public class TestDirectReader {
File ledgerDir = tmpDirs.createNew("readBuffer", "logs");
writeFileWithPattern(ledgerDir, 1234, 0xbeefcafe, 1, 1 << 20);
- BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 4, 8);
+ BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 4, 8);
try (LogReader reader = new DirectReader(1234, logFilename(ledgerDir, 1234),
ByteBufAllocator.DEFAULT,
@@ -268,7 +268,7 @@ public class TestDirectReader {
int entrySize = Buffer.ALIGNMENT / 4 + 100;
Map<Integer, Integer> offset2Pattern = new HashMap<>();
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -315,7 +315,7 @@ public class TestDirectReader {
return 0;
}
};
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE);
@@ -353,7 +353,7 @@ public class TestDirectReader {
int entrySize = Buffer.ALIGNMENT / 2 + 8;
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 20, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE);
@@ -403,7 +403,8 @@ public class TestDirectReader {
return 0; // don't preallocate
}
};
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 10, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(),
+ ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 10, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -452,7 +453,7 @@ public class TestDirectReader {
int entrySize = Buffer.ALIGNMENT * 4;
int offset1, offset2;
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT * 8, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT * 8, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234), 1 << 20,
MoreExecutors.newDirectExecutorService(), buffers, new NativeIOImpl(),
Slogger.CONSOLE)) {
@@ -496,7 +497,7 @@ public class TestDirectReader {
private static void writeFileWithPattern(File directory, int logId,
int pattern, int blockIncrement, int fileSize) throws Exception {
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(logId, logFilename(directory, logId),
fileSize, MoreExecutors.newDirectExecutorService(),
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
index b2868c907d..d897414774 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestDirectWriter.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.equalTo;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.FileInputStream;
@@ -61,7 +62,7 @@ public class TestDirectWriter {
@Test(expected = IllegalArgumentException.class)
public void testWriteAtAlignment() throws Exception {
File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678),
1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -75,7 +76,7 @@ public class TestDirectWriter {
@Test(expected = IllegalArgumentException.class)
public void testWriteAlignmentSize() throws Exception {
File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
ByteBuf bb = Unpooled.buffer(123);
@@ -88,7 +89,7 @@ public class TestDirectWriter {
@Test
public void testWriteAlignedNotAtStart() throws Exception {
File ledgerDir = tmpDirs.createNew("writeAlignment", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -102,7 +103,7 @@ public class TestDirectWriter {
@Test(timeout = 10000)
public void testFlushingWillWaitForBuffer() throws Exception {
File ledgerDir = tmpDirs.createNew("writeFailFailsFlush", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(),
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT,
Buffer.ALIGNMENT, 1); // only one buffer available, so we can't flush in bg
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -129,7 +130,7 @@ public class TestDirectWriter {
return super.pwrite(fd, pointer, count, offset);
}
};
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, io, Slogger.CONSOLE)) {
for (int i = 0; i < 10; i++) {
@@ -158,7 +159,7 @@ public class TestDirectWriter {
}
};
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, io, Slogger.CONSOLE)) {
ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -171,7 +172,7 @@ public class TestDirectWriter {
@Test
public void testWriteWithPadding() throws Exception {
File ledgerDir = tmpDirs.createNew("paddingWrite", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
LogWriter writer = new DirectWriter(5678, logFilename(ledgerDir, 5678), 1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
ByteBuf bb = Unpooled.buffer(Buffer.ALIGNMENT);
@@ -199,7 +200,7 @@ public class TestDirectWriter {
ExecutorService flushExecutor = Executors.newSingleThreadExecutor();
try {
File ledgerDir = tmpDirs.createNew("blockWrite", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
LogWriter writer = new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {
@@ -239,7 +240,7 @@ public class TestDirectWriter {
File ledgerDir = tmpDirs.createNew("failOpen", "logs");
ledgerDir.delete();
- BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
try {
new DirectWriter(1234, logFilename(ledgerDir, 1234),
1 << 30, MoreExecutors.newDirectExecutorService(),
@@ -259,7 +260,7 @@ public class TestDirectWriter {
throw new NativeIOException("pretending I'm a mac");
}
};
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
LogWriter writer = new DirectWriter(3456, logFilename(ledgerDir, 3456),
1 << 24, writeExecutor,
buffers, nativeIO, Slogger.CONSOLE)) {
@@ -282,7 +283,7 @@ public class TestDirectWriter {
public void testWriteAtIntLimit() throws Exception {
File ledgerDir = tmpDirs.createNew("intLimit", "logs");
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), 1 << 14, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, 1 << 14, 8);
LogWriter writer = new DirectWriter(3456, logFilename(ledgerDir, 3456),
(long) Integer.MAX_VALUE + (Buffer.ALIGNMENT * 100),
writeExecutor,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
index 953e69b591..4aafd18ba4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/directentrylogger/TestMetadata.java
@@ -55,7 +55,7 @@ public class TestMetadata {
public void testReadMetaFromHeader() throws Exception {
File ledgerDir = tmpDirs.createNew("writeMetadataBeforeFsync", "logs");
int logId = 5678;
- try (BufferPool buffers = new BufferPool(new NativeIOImpl(), Buffer.ALIGNMENT, 8);
+ try (BufferPool buffers = new BufferPool(new NativeIOImpl(), ByteBufAllocator.DEFAULT, Buffer.ALIGNMENT, 8);
LogWriter writer = new DirectWriter(logId, logFilename(ledgerDir, logId),
1 << 24, writeExecutor,
buffers, new NativeIOImpl(), Slogger.CONSOLE)) {