You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2019/01/08 20:16:15 UTC
[bookkeeper] branch master updated: Configure Netty allocator in
bookie and client
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1a66de0 Configure Netty allocator in bookie and client
1a66de0 is described below
commit 1a66de0f1841309390261e90d8d0d0af7aa2ede6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 8 12:16:09 2019 -0800
Configure Netty allocator in bookie and client
### Motivation
This is based on #1754. Adding the code to configure and use the allocator wrapper in bookie and client.
(I'll rebase once the first PR is merged)
Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1755 from merlimat/use-allocator
---
.../apache/bookkeeper/benchmark/BenchBookie.java | 4 +-
.../allocator/impl/ByteBufAllocatorImpl.java | 68 +++++++++++----
.../impl/ByteBufAllocatorBuilderTest.java | 12 +--
bookkeeper-server/pom.xml | 5 ++
.../java/org/apache/bookkeeper/bookie/Bookie.java | 28 ++++--
.../org/apache/bookkeeper/bookie/BookieShell.java | 10 ++-
.../apache/bookkeeper/bookie/BufferedChannel.java | 15 ++--
.../org/apache/bookkeeper/bookie/EntryLogger.java | 25 +++---
.../bookkeeper/bookie/EntryLoggerAllocator.java | 8 +-
.../bookie/InterleavedLedgerStorage.java | 14 ++-
.../java/org/apache/bookkeeper/bookie/Journal.java | 15 ++--
.../apache/bookkeeper/bookie/LedgerStorage.java | 5 +-
.../apache/bookkeeper/bookie/ReadOnlyBookie.java | 6 +-
.../bookkeeper/bookie/SlowBufferedChannel.java | 10 ++-
.../bookkeeper/bookie/SortedLedgerStorage.java | 8 +-
.../bookie/storage/ldb/DbLedgerStorage.java | 10 ++-
.../bookkeeper/bookie/storage/ldb/ReadCache.java | 10 ++-
.../ldb/SingleDirectoryDbLedgerStorage.java | 15 ++--
.../bookkeeper/bookie/storage/ldb/WriteCache.java | 11 ++-
.../org/apache/bookkeeper/client/BookKeeper.java | 49 +++++++++--
.../apache/bookkeeper/client/ClientContext.java | 3 +
.../org/apache/bookkeeper/client/LedgerHandle.java | 2 +-
.../bookkeeper/client/api/BookKeeperBuilder.java | 11 +++
.../client/impl/BookKeeperBuilderImpl.java | 7 ++
.../bookkeeper/conf/AbstractConfiguration.java | 99 ++++++++++++++++++++++
.../bookkeeper/conf/ClientConfiguration.java | 8 ++
.../apache/bookkeeper/proto/BookieClientImpl.java | 20 +++--
.../apache/bookkeeper/proto/BookieNettyServer.java | 9 +-
.../bookkeeper/proto/BookieRequestProcessor.java | 11 ++-
.../org/apache/bookkeeper/proto/BookieServer.java | 36 ++++++--
.../bookkeeper/proto/PerChannelBookieClient.java | 20 ++---
.../proto/checksum/CRC32CDigestManager.java | 7 +-
.../proto/checksum/CRC32DigestManager.java | 5 +-
.../bookkeeper/proto/checksum/DigestManager.java | 31 +++----
.../proto/checksum/DummyDigestManager.java | 5 +-
.../proto/checksum/MacDigestManager.java | 5 +-
.../bookkeeper/tls/SecurityHandlerFactory.java | 3 +-
.../apache/bookkeeper/tls/TLSContextFactory.java | 10 ++-
.../org/apache/bookkeeper/util/ByteBufList.java | 3 +-
.../bookie/BookieInitializationTest.java | 9 +-
.../bookkeeper/bookie/BufferedChannelTest.java | 6 +-
.../apache/bookkeeper/bookie/CompactionTest.java | 26 ++++--
.../apache/bookkeeper/bookie/CreateNewLogTest.java | 8 +-
.../org/apache/bookkeeper/bookie/EntryLogTest.java | 6 +-
.../bookkeeper/bookie/IndexPersistenceMgrTest.java | 4 +-
.../bookie/InterleavedLedgerStorageTest.java | 4 +-
.../apache/bookkeeper/bookie/LedgerCacheTest.java | 7 +-
.../bookkeeper/bookie/LedgerStorageTest.java | 8 +-
.../bookie/SlowInterleavedLedgerStorage.java | 10 ++-
.../bookie/SortedLedgerStorageCheckpointTest.java | 5 +-
.../apache/bookkeeper/bookie/StateManagerTest.java | 6 +-
.../apache/bookkeeper/bookie/TestSyncThread.java | 5 +-
.../bookie/storage/ldb/ConversionRollbackTest.java | 5 +-
.../bookie/storage/ldb/ConversionTest.java | 8 +-
.../storage/ldb/DbLedgerStorageWriteCacheTest.java | 11 ++-
.../storage/ldb/LocationsIndexRebuildTest.java | 5 +-
.../bookie/storage/ldb/ReadCacheTest.java | 7 +-
.../bookie/storage/ldb/WriteCacheTest.java | 23 ++---
.../apache/bookkeeper/client/BookKeeperTest.java | 1 +
.../bookkeeper/client/BookKeeperTestClient.java | 4 +-
.../org/apache/bookkeeper/client/ClientUtil.java | 4 +-
.../bookkeeper/client/MockBookKeeperTestCase.java | 10 ++-
.../bookkeeper/client/MockClientContext.java | 15 ++++
.../client/ReadLastConfirmedAndEntryOpTest.java | 4 +-
.../client/TestGetBookieInfoTimeout.java | 4 +-
.../bookkeeper/conf/TestBKConfiguration.java | 2 +
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 6 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 6 +-
.../bookkeeper/proto/BookieBackpressureTest.java | 10 ++-
.../apache/bookkeeper/proto/MockBookieClient.java | 4 +-
.../proto/TestBookieRequestProcessor.java | 9 +-
.../bookkeeper/test/BookKeeperClusterTestCase.java | 9 +-
.../apache/bookkeeper/test/BookieClientTest.java | 13 +--
conf/bk_server.conf | 50 +++++++++++
.../proto/checksum/DigestTypeBenchmark.java | 7 +-
shaded/bookkeeper-server-shaded/pom.xml | 3 +-
shaded/distributedlog-core-shaded/pom.xml | 3 +-
site/_data/config/bk_server.yaml | 59 ++++++++++++-
78 files changed, 763 insertions(+), 246 deletions(-)
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 216aff9..94776f0 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -20,6 +20,7 @@
package org.apache.bookkeeper.benchmark;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -175,7 +176,8 @@ public class BenchBookie {
new DefaultThreadFactory("BookKeeperClientScheduler"));
ClientConfiguration conf = new ClientConfiguration();
- BookieClient bc = new BookieClientImpl(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(conf, eventLoop, PooledByteBufAllocator.DEFAULT, executor, scheduler,
+ NullStatsLogger.INSTANCE);
LatencyCallback lc = new LatencyCallback();
ThroughputCallback tc = new ThroughputCallback();
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
index 3544165..1889eb9 100644
--- a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -39,6 +39,10 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
private static final Logger log = LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
+ // Same as AbstractByteBufAllocator, but copied here since it's not visible
+ private static final int DEFAULT_INITIAL_CAPACITY = 256;
+ private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
+
private final ByteBufAllocator pooledAllocator;
private final ByteBufAllocator unpooledAllocator;
private final PoolingPolicy poolingPolicy;
@@ -63,16 +67,22 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
if (poolingPolicy == PoolingPolicy.PooledDirect) {
if (pooledAllocator == null) {
- this.pooledAllocator = new PooledByteBufAllocator(
- true /* preferDirect */,
- poolingConcurrency /* nHeapArena */,
- poolingConcurrency /* nDirectArena */,
- PooledByteBufAllocator.defaultPageSize(),
- PooledByteBufAllocator.defaultMaxOrder(),
- PooledByteBufAllocator.defaultTinyCacheSize(),
- PooledByteBufAllocator.defaultSmallCacheSize(),
- PooledByteBufAllocator.defaultNormalCacheSize(),
- PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ if (poolingConcurrency == PooledByteBufAllocator.defaultNumDirectArena()) {
+ // If all the parameters are the same as in the default Netty pool,
+ // just reuse the static instance as the underlying allocator.
+ this.pooledAllocator = PooledByteBufAllocator.DEFAULT;
+ } else {
+ this.pooledAllocator = new PooledByteBufAllocator(
+ true /* preferDirect */,
+ poolingConcurrency /* nHeapArena */,
+ poolingConcurrency /* nDirectArena */,
+ PooledByteBufAllocator.defaultPageSize(),
+ PooledByteBufAllocator.defaultMaxOrder(),
+ PooledByteBufAllocator.defaultTinyCacheSize(),
+ PooledByteBufAllocator.defaultSmallCacheSize(),
+ PooledByteBufAllocator.defaultNormalCacheSize(),
+ PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ }
} else {
this.pooledAllocator = pooledAllocator;
}
@@ -110,6 +120,25 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
}
@Override
+ public ByteBuf buffer() {
+ return buffer(DEFAULT_INITIAL_CAPACITY);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity) {
+ return buffer(initialCapacity, DEFAULT_MAX_CAPACITY);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+ if (poolingPolicy == PoolingPolicy.PooledDirect) {
+ return newDirectBuffer(initialCapacity, maxCapacity, true /* can fallback to heap if needed */);
+ } else {
+ return newHeapBuffer(initialCapacity, maxCapacity);
+ }
+ }
+
+ @Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
try {
// There are few cases in which we ask explicitly for a pooled
@@ -125,30 +154,33 @@ public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements By
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ // If caller asked specifically for a direct buffer, we cannot fallback to heap
+ return newDirectBuffer(initialCapacity, maxCapacity, false);
+ }
+
+ private ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity, boolean canFallbackToHeap) {
if (poolingPolicy == PoolingPolicy.PooledDirect) {
try {
return pooledAllocator.directBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
- switch (outOfMemoryPolicy) {
- case ThrowException:
- outOfMemoryListener.accept(e);
- throw e;
-
- case FallbackToHeap:
+ if (canFallbackToHeap && outOfMemoryPolicy == OutOfMemoryPolicy.FallbackToHeap) {
try {
return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e2) {
outOfMemoryListener.accept(e2);
throw e2;
}
+ } else {
+ // ThrowException
+ outOfMemoryListener.accept(e);
+ throw e;
}
- return null;
}
} else {
// Unpooled heap buffer. Force heap buffers because unpooled direct
// buffers have very high overhead of allocation/reclaiming
try {
- return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+ return unpooledAllocator.directBuffer(initialCapacity, maxCapacity);
} catch (OutOfMemoryError e) {
outOfMemoryListener.accept(e);
throw e;
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
index 8ff66c3..662dd83 100644
--- a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -145,10 +145,10 @@ public class ByteBufAllocatorBuilderTest {
}
@Test
- public void testOomUnpooled() {
+ public void testOomUnpooledDirect() {
ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
- OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
- when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+ OutOfMemoryError noMemError = new OutOfMemoryError("no more direct mem");
+ when(heapAlloc.directBuffer(anyInt(), anyInt())).thenThrow(noMemError);
AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
@@ -166,11 +166,11 @@ public class ByteBufAllocatorBuilderTest {
fail("Should have thrown exception");
} catch (OutOfMemoryError e) {
// Expected
- assertEquals(noHeapError, e);
+ assertEquals(noMemError, e);
}
// Ensure the notification was triggered even when exception is thrown
- assertEquals(noHeapError, receivedException.get());
+ assertEquals(noMemError, receivedException.get());
}
@Test
@@ -214,7 +214,7 @@ public class ByteBufAllocatorBuilderTest {
ByteBuf buf2 = alloc.directBuffer();
assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
- assertTrue(buf2.hasArray());
+ assertFalse(buf2.hasArray());
}
@Test
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 10b5ea4..c1b684e 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -32,6 +32,11 @@
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-common-allocator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-proto</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index fd352d8..a0acd31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -28,9 +28,13 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
+
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
@@ -53,6 +57,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+
import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
@@ -136,6 +141,8 @@ public class Bookie extends BookieCriticalThread {
final StatsLogger statsLogger;
private final BookieStats bookieStats;
+ private final ByteBufAllocator allocator;
+
/**
* Exception is thrown when no such a ledger is found in this bookie.
*/
@@ -600,7 +607,7 @@ public class Bookie extends BookieCriticalThread {
public Bookie(ServerConfiguration conf)
throws IOException, InterruptedException, BookieException {
- this(conf, NullStatsLogger.INSTANCE);
+ this(conf, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
}
private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) throws IOException {
@@ -658,12 +665,13 @@ public class Bookie extends BookieCriticalThread {
null,
checkpointSource,
checkpointer,
- statsLogger);
+ statsLogger,
+ UnpooledByteBufAllocator.DEFAULT);
return ledgerStorage;
}
- public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
+ public Bookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
throws IOException, InterruptedException, BookieException {
super("Bookie-" + conf.getBookiePort());
this.statsLogger = statsLogger;
@@ -676,6 +684,7 @@ public class Bookie extends BookieCriticalThread {
this.ledgerDirsManager = createLedgerDirsManager(conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
this.indexDirsManager = createIndexDirsManager(conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE),
this.ledgerDirsManager);
+ this.allocator = allocator;
// instantiate zookeeper client to initialize ledger manager
this.metadataDriver = instantiateMetadataDriver(conf);
@@ -732,7 +741,7 @@ public class Bookie extends BookieCriticalThread {
journals = Lists.newArrayList();
for (int i = 0; i < journalDirectories.size(); i++) {
journals.add(new Journal(i, journalDirectories.get(i),
- conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE)));
+ conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE), allocator));
}
this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -786,7 +795,8 @@ public class Bookie extends BookieCriticalThread {
stateManager,
checkpointSource,
syncThread,
- statsLogger);
+ statsLogger,
+ allocator);
handles = new HandleFactoryImpl(ledgerStorage);
@@ -1287,8 +1297,8 @@ public class Bookie extends BookieCriticalThread {
}
}
- static ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
- ByteBuf bb = PooledByteBufAllocator.DEFAULT.directBuffer(8 + 8 + 4 + explicitLac.capacity());
+ private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
+ ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + explicitLac.capacity());
bb.writeLong(ledgerId);
bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
bb.writeInt(explicitLac.capacity());
@@ -1485,6 +1495,10 @@ public class Bookie extends BookieCriticalThread {
}
}
+ public ByteBufAllocator getAllocator() {
+ return allocator;
+ }
+
/**
* Format the bookie server data.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index f53195a..dc9137e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -29,7 +29,9 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -917,8 +919,8 @@ public class BookieShell implements Tool {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientSchedulerPool"));
- BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bookieClient = new BookieClientImpl(conf, eventLoopGroup,
+ UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
LongStream.range(firstEntry, lastEntry).forEach(entryId -> {
CompletableFuture<Void> future = new CompletableFuture<>();
@@ -2825,9 +2827,9 @@ public class BookieShell implements Tool {
};
dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
- checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
int convertedLedgers = 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 633c540..31fb203 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -66,23 +66,24 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
private boolean closed = false;
// make constructor to be public for unit test
- public BufferedChannel(FileChannel fc, int capacity) throws IOException {
+ public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
// Use the same capacity for read and write buffers.
- this(fc, capacity, 0L);
+ this(allocator, fc, capacity, 0L);
}
- public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+ public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity, long unpersistedBytesBound)
+ throws IOException {
// Use the same capacity for read and write buffers.
- this(fc, capacity, capacity, unpersistedBytesBound);
+ this(allocator, fc, capacity, capacity, unpersistedBytesBound);
}
- public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
- throws IOException {
+ public BufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+ long unpersistedBytesBound) throws IOException {
super(fc, readCapacity);
this.writeCapacity = writeCapacity;
this.position = new AtomicLong(fc.position());
this.writeBufferStartPosition.set(position.get());
- this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+ this.writeBuffer = allocator.directBuffer(writeCapacity);
this.unpersistedBytes = new AtomicLong(0);
this.unpersistedBytesBound = unpersistedBytesBound;
this.doRegularFlushes = unpersistedBytesBound > 0;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index af82620..1389370 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -88,9 +88,9 @@ public class EntryLogger {
private final File logFile;
private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
- public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
- long unpersistedBytesBound) throws IOException {
- super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
+ public BufferedLogChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity,
+ long logId, File logFile, long unpersistedBytesBound) throws IOException {
+ super(allocator, fc, writeCapacity, readCapacity, unpersistedBytesBound);
this.logId = logId;
this.entryLogMetadata = new EntryLogMetadata(logId);
this.logFile = logFile;
@@ -283,6 +283,8 @@ public class EntryLogger {
private final int maxSaneEntrySize;
+ private final ByteBufAllocator allocator;
+
final ServerConfiguration conf;
/**
* Scan entries in a entry log file.
@@ -332,15 +334,16 @@ public class EntryLogger {
*/
public EntryLogger(ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) throws IOException {
- this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
+ this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
}
public EntryLogger(ServerConfiguration conf,
- LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger)
- throws IOException {
+ LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
//We reserve 500 bytes as overhead for the protocol. This is not 100% accurate
// but the protocol varies so an exact value is difficult to determine
this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
+ this.allocator = allocator;
this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
@@ -371,7 +374,7 @@ public class EntryLogger {
}
this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
- logId);
+ logId, allocator);
if (entryLogPerLedgerEnabled) {
this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger);
@@ -840,7 +843,7 @@ public class EntryLogger {
throw new IOException(e.toString());
}
- ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entry.entrySize, entry.entrySize);
+ ByteBuf data = allocator.buffer(entry.entrySize, entry.entrySize);
int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
if (rc != entry.entrySize) {
// Note that throwing NoEntryException here instead of IOException is not
@@ -872,7 +875,7 @@ public class EntryLogger {
BufferedReadChannel bc = getChannelForLogId(entryLogId);
// Allocate buffer to read (version, ledgersMapOffset, ledgerCount)
- ByteBuf headers = PooledByteBufAllocator.DEFAULT.directBuffer(LOGFILE_HEADER_SIZE);
+ ByteBuf headers = allocator.directBuffer(LOGFILE_HEADER_SIZE);
try {
bc.read(headers, 0);
@@ -988,7 +991,7 @@ public class EntryLogger {
long pos = LOGFILE_HEADER_SIZE;
// Start with a reasonably sized buffer size
- ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(1024 * 1024);
+ ByteBuf data = allocator.directBuffer(1024 * 1024);
try {
@@ -1070,7 +1073,7 @@ public class EntryLogger {
EntryLogMetadata meta = new EntryLogMetadata(entryLogId);
final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
- ByteBuf ledgersMap = ByteBufAllocator.DEFAULT.directBuffer(maxMapSize);
+ ByteBuf ledgersMap = allocator.directBuffer(maxMapSize);
try {
while (offset < bc.size()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 3ddd8e2..1c32b55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -25,6 +25,7 @@ import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.BufferedWriter;
@@ -61,11 +62,14 @@ class EntryLoggerAllocator {
private final Object createCompactionLogLock = new Object();
private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
private final boolean entryLogPreAllocationEnabled;
+ private final ByteBufAllocator byteBufAllocator;
final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
- EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId) {
+ EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
+ ByteBufAllocator byteBufAllocator) {
this.conf = conf;
+ this.byteBufAllocator = byteBufAllocator;
this.ledgerDirsManager = ledgerDirsManager;
this.preallocatedLogId = logId;
this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
@@ -161,7 +165,7 @@ class EntryLoggerAllocator {
File newLogFile = new File(dirForNextEntryLog, logFileName);
FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
- BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+ BufferedLogChannel logChannel = new BufferedLogChannel(byteBufAllocator, channel, conf.getWriteBufferBytes(),
conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
logfileHeader.readerIndex(0);
logChannel.write(logfileHeader);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index d1287c3..3b5bf01 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -129,7 +131,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator)
throws IOException {
initializeWithEntryLogListener(
conf,
@@ -140,7 +143,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
checkpointSource,
checkpointer,
this,
- statsLogger);
+ statsLogger,
+ allocator);
}
void initializeWithEntryLogListener(ServerConfiguration conf,
@@ -151,7 +155,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
CheckpointSource checkpointSource,
Checkpointer checkpointer,
EntryLogListener entryLogListener,
- StatsLogger statsLogger) throws IOException {
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
initializeWithEntryLogger(
conf,
ledgerManager,
@@ -160,7 +165,8 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
stateManager,
checkpointSource,
checkpointer,
- new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE)),
+ new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
+ allocator),
statsLogger);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index dd47da8..0a78fca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -26,7 +26,9 @@ import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -85,8 +87,8 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
*/
@FunctionalInterface
public interface BufferedChannelBuilder {
- BufferedChannelBuilder DEFAULT_BCBUILDER =
- (FileChannel fc, int capacity) -> new BufferedChannel(fc, capacity);
+ BufferedChannelBuilder DEFAULT_BCBUILDER = (FileChannel fc,
+ int capacity) -> new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
BufferedChannel create(FileChannel fc, int capacity) throws IOException;
}
@@ -627,18 +629,21 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
+ private final ByteBufAllocator allocator;
// Expose Stats
private final JournalStats journalStats;
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager) {
- this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE);
+ this(journalIndex, journalDirectory, conf, ledgerDirsManager, NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
}
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
- LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
+ LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger, ByteBufAllocator allocator) {
super("BookieJournal-" + conf.getBookiePort());
+ this.allocator = allocator;
if (conf.isBusyWaitEnabled()) {
// To achieve lower latency, use busy-wait blocking queue implementation
@@ -1161,7 +1166,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
}
public BufferedChannelBuilder getBufferedChannelBuilder() {
- return BufferedChannelBuilder.DEFAULT_BCBUILDER;
+ return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index 111b8c2..1353e8b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,7 +55,8 @@ public interface LedgerStorage {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator)
throws IOException;
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 5125c07..cb6e9f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -21,6 +21,8 @@
package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -38,9 +40,9 @@ public class ReadOnlyBookie extends Bookie {
private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyBookie.class);
- public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
+ public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger, ByteBufAllocator allocator)
throws IOException, KeeperException, InterruptedException, BookieException {
- super(conf, statsLogger);
+ super(conf, statsLogger, allocator);
if (conf.isReadOnlyModeEnabled()) {
stateManager.forceToReadOnly();
} else {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
index 9fdc34c..e9731a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SlowBufferedChannel.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie;
*/
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.nio.channels.FileChannel;
@@ -36,12 +37,13 @@ public class SlowBufferedChannel extends BufferedChannel {
public volatile long addDelay = 0;
public volatile long flushDelay = 0;
- public SlowBufferedChannel(FileChannel fc, int capacity) throws IOException {
- super(fc, capacity);
+ public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int capacity) throws IOException {
+ super(allocator, fc, capacity);
}
- public SlowBufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
- super(fc, writeCapacity, readCapacity);
+ public SlowBufferedChannel(ByteBufAllocator allocator, FileChannel fc, int writeCapacity, int readCapacity)
+ throws IOException {
+ super(allocator, fc, writeCapacity, readCapacity);
}
public void setAddDelay(long delay) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 5e4dbad..77653db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -24,6 +24,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.util.List;
import java.util.Optional;
@@ -72,7 +74,8 @@ public class SortedLedgerStorage
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator)
throws IOException {
interleavedLedgerStorage.initializeWithEntryLogListener(
@@ -86,7 +89,8 @@ public class SortedLedgerStorage
// uses sorted ledger storage's own entry log listener
// since it manages entry log rotations and checkpoints.
this,
- statsLogger);
+ statsLogger,
+ allocator);
if (conf.isEntryLogPerLedgerEnabled()) {
this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index ccabaed..287f452 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
//CHECKSTYLE.OFF: IllegalImport
import io.netty.util.internal.PlatformDependent;
@@ -63,7 +64,6 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
-
/**
* Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
*/
@@ -88,13 +88,16 @@ public class DbLedgerStorage implements LedgerStorage {
private ScheduledExecutorService gcExecutor;
private DbLedgerStorageStats stats;
+ protected ByteBufAllocator allocator;
+
@Override
public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
- Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+ Checkpointer checkpointer, StatsLogger statsLogger, ByteBufAllocator allocator) throws IOException {
long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+ this.allocator = allocator;
this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
log.info("Started Db Ledger Storage");
@@ -139,7 +142,8 @@ public class DbLedgerStorage implements LedgerStorage {
StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
throws IOException {
return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
- stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+ stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
+ readCacheSize);
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
index b14478f..986c741 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -57,13 +57,15 @@ public class ReadCache implements Closeable {
private final int segmentSize;
+ private ByteBufAllocator allocator;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- public ReadCache(long maxCacheSize) {
- this(maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
+ public ReadCache(ByteBufAllocator allocator, long maxCacheSize) {
+ this(allocator, maxCacheSize, DEFAULT_MAX_SEGMENT_SIZE);
}
- public ReadCache(long maxCacheSize, int maxSegmentSize) {
+ public ReadCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
+ this.allocator = allocator;
int segmentsCount = Math.max(2, (int) (maxCacheSize / maxSegmentSize));
segmentSize = (int) (maxCacheSize / segmentsCount);
@@ -140,7 +142,7 @@ public class ReadCache implements Closeable {
int entryOffset = (int) res.first;
int entryLen = (int) res.second;
- ByteBuf entry = ByteBufAllocator.DEFAULT.directBuffer(entryLen, entryLen);
+ ByteBuf entry = allocator.directBuffer(entryLen, entryLen);
entry.writeBytes(cacheSegments.get(segmentIdx), entryOffset, entryLen);
return entry;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 197014a..3289b3d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
@@ -132,7 +133,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
- ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+ ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+ throws IOException {
checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
"Db implementation only allows for one storage dir");
@@ -141,8 +143,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
log.info("Creating single directory db ledger storage on {}", baseDir);
this.writeCacheMaxSize = writeCacheSize;
- this.writeCache = new WriteCache(writeCacheMaxSize / 2);
- this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+ this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
+ this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
this.checkpointSource = checkpointSource;
@@ -153,7 +155,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
DEFAULT_MAX_THROTTLE_TIME_MILLIS);
maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
- readCache = new ReadCache(readCacheMaxSize);
+ readCache = new ReadCache(allocator, readCacheMaxSize);
ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, statsLogger);
@@ -164,7 +166,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
- entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+ entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
dbLedgerStorageStats = new DbLedgerStorageStats(
@@ -179,7 +181,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
@Override
public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
- Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+ Checkpointer checkpointer, StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
/// Initialized in constructor
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
index 08ffe67..ac58e8e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -80,17 +80,20 @@ public class WriteCache implements Closeable {
private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
- public WriteCache(long maxCacheSize) {
+ private final ByteBufAllocator allocator;
+
+ public WriteCache(ByteBufAllocator allocator, long maxCacheSize) {
// Default maxSegmentSize set to 1Gb
- this(maxCacheSize, 1 * 1024 * 1024 * 1024);
+ this(allocator, maxCacheSize, 1 * 1024 * 1024 * 1024);
}
- public WriteCache(long maxCacheSize, int maxSegmentSize) {
+ public WriteCache(ByteBufAllocator allocator, long maxCacheSize, int maxSegmentSize) {
checkArgument(maxSegmentSize > 0);
long alignedMaxSegmentSize = alignToPowerOfTwo(maxSegmentSize);
checkArgument(maxSegmentSize == alignedMaxSegmentSize, "Max segment size needs to be in form of 2^n");
+ this.allocator = allocator;
this.maxCacheSize = maxCacheSize;
this.maxSegmentSize = (int) maxSegmentSize;
this.segmentOffsetMask = maxSegmentSize - 1;
@@ -185,7 +188,7 @@ public class WriteCache implements Closeable {
long offset = result.first;
int size = (int) result.second;
- ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+ ByteBuf entry = allocator.buffer(size, size);
int localOffset = (int) (offset & segmentOffsetMask);
int segmentIdx = (int) (offset >>> segmentOffsetBits);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 0acf16f..94591af 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -26,6 +26,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WATCHER_SCOPE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -56,6 +58,7 @@ import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -99,10 +102,11 @@ import org.slf4j.LoggerFactory;
*/
public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
- static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
final EventLoopGroup eventLoopGroup;
+ private final ByteBufAllocator allocator;
// The stats logger for this client.
private final StatsLogger statsLogger;
@@ -149,6 +153,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
ZooKeeper zk = null;
EventLoopGroup eventLoopGroup = null;
+ ByteBufAllocator allocator = null;
StatsLogger statsLogger = NullStatsLogger.INSTANCE;
DNSToSwitchMapping dnsResolver = null;
HashedWheelTimer requestTimer = null;
@@ -214,6 +219,18 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
}
/**
+ * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+ *
+ * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+ * @return client builder.
+ * @since 4.9
+ */
+ public Builder allocator(ByteBufAllocator allocator) {
+ this.allocator = allocator;
+ return this;
+ }
+
+ /**
* Configure the bookkeeper client with a provided {@link ZooKeeper} client.
*
* @param zk an external {@link ZooKeeper} client to use by the bookkeeper client.
@@ -276,7 +293,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
public BookKeeper build() throws IOException, InterruptedException, BKException {
checkNotNull(statsLogger, "No stats logger provided");
- return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider);
+ return new BookKeeper(conf, zk, eventLoopGroup, allocator, statsLogger, dnsResolver, requestTimer,
+ featureProvider);
}
}
@@ -313,7 +331,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
*/
public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, BKException {
- this(conf, null, null, NullStatsLogger.INSTANCE,
+ this(conf, null, null, null, NullStatsLogger.INSTANCE,
null, null, null);
}
@@ -347,7 +365,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
throws IOException, InterruptedException, BKException {
- this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null);
+ this(conf, validateZooKeeper(zk), null, null, NullStatsLogger.INSTANCE, null, null, null);
}
/**
@@ -369,17 +387,19 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
throws IOException, InterruptedException, BKException {
- this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
+ this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), null, NullStatsLogger.INSTANCE,
null, null, null);
}
/**
* Constructor for use with the builder. Other constructors also use it.
*/
+ @SuppressWarnings("deprecation")
@VisibleForTesting
BookKeeper(ClientConfiguration conf,
ZooKeeper zkc,
EventLoopGroup eventLoopGroup,
+ ByteBufAllocator byteBufAllocator,
StatsLogger rootStatsLogger,
DNSToSwitchMapping dnsResolver,
HashedWheelTimer requestTimer,
@@ -443,8 +463,19 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
this.ownEventLoopGroup = false;
}
+ if (byteBufAllocator != null) {
+ this.allocator = byteBufAllocator;
+ } else {
+ this.allocator = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(conf.getAllocatorPoolingPolicy())
+ .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+ .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+ .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .build();
+ }
+
// initialize bookie client
- this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.mainWorkerPool,
+ this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool,
scheduler, rootStatsLogger);
if (null == requestTimer) {
@@ -517,6 +548,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
bookieWatcher = null;
bookieInfoScheduler = null;
bookieClient = null;
+ allocator = UnpooledByteBufAllocator.DEFAULT;
}
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
@@ -1466,6 +1498,11 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
public boolean isClientClosed() {
return BookKeeper.this.isClosed();
}
+
+ @Override
+ public ByteBufAllocator getByteBufAllocator() {
+ return allocator;
+ }
};
ClientContext getClientCtx() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
index d8803d0..da3abde 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientContext.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client;
+import io.netty.buffer.ByteBufAllocator;
+
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -37,6 +39,7 @@ interface ClientContext {
BookieWatcher getBookieWatcher();
EnsemblePlacementPolicy getPlacementPolicy();
BookieClient getBookieClient();
+ ByteBufAllocator getByteBufAllocator();
OrderedExecutor getMainWorkerPool();
OrderedScheduler getScheduler();
BookKeeperClientStats getClientStats();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 157a1b8..bd8ec68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -188,7 +188,7 @@ public class LedgerHandle implements WriteHandle {
}
macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
- clientCtx.getConf().useV2WireProtocol);
+ clientCtx.getByteBufAllocator(), clientCtx.getConf().useV2WireProtocol);
// If the password is empty, pass the same random ledger key which is generated by the hash of the empty
// password, so that the bookie can avoid processing the keys for each entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
index 0e147dd..ea30dc5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeperBuilder.java
@@ -20,9 +20,11 @@
*/
package org.apache.bookkeeper.client.api;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
+
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
import org.apache.bookkeeper.feature.FeatureProvider;
@@ -48,6 +50,15 @@ public interface BookKeeperBuilder {
BookKeeperBuilder eventLoopGroup(EventLoopGroup eventLoopGroup);
/**
+ * Configure the bookkeeper client with a provided {@link ByteBufAllocator}.
+ *
+ * @param allocator an external {@link ByteBufAllocator} to use by the bookkeeper client.
+ * @return client builder.
+ * @since 4.9
+ */
+ BookKeeperBuilder allocator(ByteBufAllocator allocator);
+
+ /**
* Configure the bookkeeper client with a provided {@link StatsLogger}.
*
* @param statsLogger an {@link StatsLogger} to use by the bookkeeper client to collect stats generated by the
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
index 3a07d1b..6373ace 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/impl/BookKeeperBuilderImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.client.impl;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import java.io.IOException;
@@ -51,6 +52,12 @@ public class BookKeeperBuilderImpl implements BookKeeperBuilder {
}
@Override
+ public BookKeeperBuilder allocator(ByteBufAllocator allocator) {
+ builder.allocator(allocator);
+ return this;
+ }
+
+ @Override
public BookKeeperBuilder statsLogger(StatsLogger statsLogger) {
builder.statsLogger(statsLogger);
return this;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 65d702b..36cfa63 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -28,6 +28,10 @@ import java.util.Map;
import javax.net.ssl.SSLEngine;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
import org.apache.bookkeeper.common.util.ReflectionUtils;
@@ -155,6 +159,12 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
// enforce minimum number of racks per write quorum
public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
+ // Allocator configuration
+ protected static final String ALLOCATOR_POOLING_POLICY = "allocatorPoolingPolicy";
+ protected static final String ALLOCATOR_POOLING_CONCURRENCY = "allocatorPoolingConcurrency";
+ protected static final String ALLOCATOR_OOM_POLICY = "allocatorOutOfMemoryPolicy";
+ protected static final String ALLOCATOR_LEAK_DETECTION_POLICY = "allocatorLeakDetectionPolicy";
+
// option to limit stats logging
public static final String LIMIT_STATS_LOGGING = "limitStatsLogging";
@@ -882,6 +892,95 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
}
/**
+ * @return the configured pooling policy for the allocator.
+ */
+ public PoolingPolicy getAllocatorPoolingPolicy() {
+ return PoolingPolicy.valueOf(this.getString(ALLOCATOR_POOLING_POLICY, PoolingPolicy.PooledDirect.toString()));
+ }
+
+ /**
+ * Define the memory pooling policy.
+ *
+ * <p>Default is {@link PoolingPolicy#PooledDirect}
+ *
+ * @param poolingPolicy
+ * the memory pooling policy
+ * @return configuration object.
+ */
+ public T setAllocatorPoolingPolicy(PoolingPolicy poolingPolicy) {
+ this.setProperty(ALLOCATOR_POOLING_POLICY, poolingPolicy.toString());
+ return getThis();
+ }
+
+ /**
+ * @return the configured pooling concurrency for the allocator.
+ */
+ public int getAllocatorPoolingConcurrency() {
+ return this.getInteger(ALLOCATOR_POOLING_CONCURRENCY, 2 * Runtime.getRuntime().availableProcessors());
+ }
+
+ /**
+ * Controls the amount of concurrency for the memory pool.
+ *
+ * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+ *
+ * <p>Decreasing this number will reduce the amount of memory overhead, at the
+ * expense of increased allocation contention.
+ *
+ * @param concurrency
+ * the concurrency level to use for the allocator pool
+ * @return configuration object.
+ */
+ public T setAllocatorPoolingConcurrenncy(int concurrency) {
+ this.setProperty(ALLOCATOR_POOLING_POLICY, concurrency);
+ return getThis();
+ }
+
+ /**
+ * @return the configured ouf of memory policy for the allocator.
+ */
+ public OutOfMemoryPolicy getAllocatorOutOfMemoryPolicy() {
+ return OutOfMemoryPolicy
+ .valueOf(this.getString(ALLOCATOR_OOM_POLICY, OutOfMemoryPolicy.FallbackToHeap.toString()));
+ }
+
+ /**
+ * Define the memory allocator out of memory policy.
+ *
+ * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+ *
+ * @param oomPolicy
+ * the "out-of-memory" policy for the memory allocator
+ * @return configuration object.
+ */
+ public T setAllocatorOutOfMemoryPolicy(OutOfMemoryPolicy oomPolicy) {
+ this.setProperty(ALLOCATOR_OOM_POLICY, oomPolicy.toString());
+ return getThis();
+ }
+
+ /**
+ * Return the configured leak detection policy for the allocator.
+ */
+ public LeakDetectionPolicy getAllocatorLeakDetectionPolicy() {
+ return LeakDetectionPolicy
+ .valueOf(this.getString(ALLOCATOR_LEAK_DETECTION_POLICY, LeakDetectionPolicy.Disabled.toString()));
+ }
+
+ /**
+ * Enable the leak detection for the allocator.
+ *
+ * <p>Default is {@link LeakDetectionPolicy#Disabled}
+ *
+ * @param leakDetectionPolicy
+ * the leak detection policy for the memory allocator
+ * @return configuration object.
+ */
+ public T setAllocatorLeakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy) {
+ this.setProperty(ALLOCATOR_LEAK_DETECTION_POLICY, leakDetectionPolicy.toString());
+ return getThis();
+ }
+
+ /**
* Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
*
* <p>Default is false
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 7d0d319..3b390d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -21,11 +21,14 @@ import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
import io.netty.buffer.ByteBuf;
+
import java.util.concurrent.TimeUnit;
+
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.BookKeeperBuilder;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.discover.ZKRegistrationClient;
@@ -1772,8 +1775,11 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
/**
* Option to use Netty Pooled ByteBufs.
*
+ * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+ *
* @return the value of the option
*/
+ @Deprecated
public boolean isNettyUsePooledBuffers() {
return getBoolean(NETTY_USE_POOLED_BUFFERS, true);
}
@@ -1785,6 +1791,8 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
* @param enabled
* if enabled BookKeeper will use default Pooled Netty Buffer allocator
*
+ * @deprecated see {@link BookKeeperBuilder#allocator(io.netty.buffer.ByteBufAllocator)}
+ *
* @see #setUseV2WireProtocol(boolean)
* @see ByteBuf#release()
* @see LedgerHandle#readEntries(long, long)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index 18f48a2..8342821 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.protobuf.ExtensionRegistry;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -76,11 +77,12 @@ import org.slf4j.LoggerFactory;
public class BookieClientImpl implements BookieClient, PerChannelBookieClientFactory {
static final Logger LOG = LoggerFactory.getLogger(BookieClient.class);
- OrderedExecutor executor;
- ScheduledExecutorService scheduler;
- ScheduledFuture<?> timeoutFuture;
+ private final OrderedExecutor executor;
+ private final ScheduledExecutorService scheduler;
+ private final ScheduledFuture<?> timeoutFuture;
- EventLoopGroup eventLoopGroup;
+ private final EventLoopGroup eventLoopGroup;
+ private final ByteBufAllocator allocator;
final ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool> channels =
new ConcurrentHashMap<BookieSocketAddress, PerChannelBookieClientPool>();
@@ -96,10 +98,12 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
private final long bookieErrorThresholdPerInterval;
public BookieClientImpl(ClientConfiguration conf, EventLoopGroup eventLoopGroup,
+ ByteBufAllocator allocator,
OrderedExecutor executor, ScheduledExecutorService scheduler,
StatsLogger statsLogger) throws IOException {
this.conf = conf;
this.eventLoopGroup = eventLoopGroup;
+ this.allocator = allocator;
this.executor = executor;
this.closed = false;
this.closeLock = new ReentrantReadWriteLock();
@@ -120,6 +124,8 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
conf.getTimeoutMonitorIntervalSec(),
conf.getTimeoutMonitorIntervalSec(),
TimeUnit.SECONDS);
+ } else {
+ this.timeoutFuture = null;
}
}
@@ -175,7 +181,7 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
if (conf.getLimitStatsLogging()) {
statsLoggerForPCBC = NullStatsLogger.INSTANCE;
}
- return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLoggerForPCBC,
+ return new PerChannelBookieClient(conf, executor, eventLoopGroup, allocator, address, statsLoggerForPCBC,
authProviderFactory, registry, pcbcPool, shFactory);
}
@@ -596,8 +602,8 @@ public class BookieClientImpl implements BookieClient, PerChannelBookieClientFac
.build();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("BookKeeperClientScheduler"));
- BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClientImpl bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+ null, executor, scheduler, NullStatsLogger.INSTANCE);
BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1]));
for (int i = 0; i < 100000; i++) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index add4537..5b5c288 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ExtensionRegistry;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
@@ -107,8 +108,11 @@ class BookieNettyServer {
final BookieAuthProvider.Factory authProviderFactory;
final ExtensionRegistry registry = ExtensionRegistry.newInstance();
- BookieNettyServer(ServerConfiguration conf, RequestProcessor processor)
+ private final ByteBufAllocator allocator;
+
+ BookieNettyServer(ServerConfiguration conf, RequestProcessor processor, ByteBufAllocator allocator)
throws IOException, KeeperException, InterruptedException, BookieException {
+ this.allocator = allocator;
this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
this.conf = conf;
this.requestProcessor = processor;
@@ -296,7 +300,8 @@ class BookieNettyServer {
private void listenOn(InetSocketAddress address, BookieSocketAddress bookieAddress) throws InterruptedException {
if (!conf.isDisableServerSocketBind()) {
ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true));
+ bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+ bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
bootstrap.group(eventLoopGroup, eventLoopGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, conf.getServerTcpNoDelay());
bootstrap.childOption(ChannelOption.SO_LINGER, conf.getServerSockLinger());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index b883f74..7d5e2e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -29,6 +29,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
@@ -44,6 +45,7 @@ import java.util.function.Consumer;
import lombok.AccessLevel;
import lombok.Getter;
+
import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
import org.apache.bookkeeper.auth.AuthToken;
import org.apache.bookkeeper.bookie.Bookie;
@@ -124,9 +126,12 @@ public class BookieRequestProcessor implements RequestProcessor {
final Optional<Cache<Channel, Boolean>> blacklistedChannels;
final Consumer<Channel> onResponseTimeout;
- public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
- StatsLogger statsLogger, SecurityHandlerFactory shFactory) throws SecurityException {
+ private final ByteBufAllocator allocator;
+
+ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
+ SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
this.serverCfg = serverCfg;
+ this.allocator = allocator;
this.waitTimeoutOnBackpressureMillis = serverCfg.getWaitTimeoutOnResponseBackpressureMillis();
this.preserveMdcForTaskExecution = serverCfg.getPreserveMdcForTaskExecution();
this.bookie = bookie;
@@ -158,7 +163,7 @@ public class BookieRequestProcessor implements RequestProcessor {
OrderedExecutor.NO_TASK_LIMIT, statsLogger);
this.shFactory = shFactory;
if (shFactory != null) {
- shFactory.init(NodeType.Server, serverCfg);
+ shFactory.init(NodeType.Server, serverCfg, allocator);
}
this.requestTimer = new HashedWheelTimer(
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 27e8ab2..b8c1adc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -25,6 +25,8 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
import static org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP_USERS;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.UnknownHostException;
@@ -38,6 +40,7 @@ import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.bookie.ReadOnlyBookie;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -62,7 +65,7 @@ public class BookieServer {
final ServerConfiguration conf;
BookieNettyServer nettyServer;
private volatile boolean running = false;
- Bookie bookie;
+ private final Bookie bookie;
DeathWatcher deathWatcher;
private static final Logger LOG = LoggerFactory.getLogger(BookieServer.class);
@@ -96,10 +99,11 @@ public class BookieServer {
LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
}
+ ByteBufAllocator allocator = getAllocator(conf);
this.statsLogger = statsLogger;
- this.nettyServer = new BookieNettyServer(this.conf, null);
+ this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
- this.bookie = newBookie(conf);
+ this.bookie = newBookie(conf, allocator);
} catch (IOException | KeeperException | InterruptedException | BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
@@ -110,7 +114,7 @@ public class BookieServer {
shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
- statsLogger.scope(SERVER_SCOPE), shFactory);
+ statsLogger.scope(SERVER_SCOPE), shFactory, bookie.getAllocator());
this.nettyServer.setRequestProcessor(this.requestProcessor);
}
@@ -126,11 +130,11 @@ public class BookieServer {
this.uncaughtExceptionHandler = exceptionHandler;
}
- protected Bookie newBookie(ServerConfiguration conf)
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
throws IOException, KeeperException, InterruptedException, BookieException {
return conf.isForceReadOnlyBookie()
- ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE))
- : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
+ ? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator)
+ : new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator);
}
public void start() throws IOException, UnavailableException, InterruptedException, BKException {
@@ -285,6 +289,24 @@ public class BookieServer {
}
}
+ private ByteBufAllocator getAllocator(ServerConfiguration conf) {
+ return ByteBufAllocatorBuilder.create()
+ .poolingPolicy(conf.getAllocatorPoolingPolicy())
+ .poolingConcurrency(conf.getAllocatorPoolingConcurrency())
+ .outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
+ .outOfMemoryListener((ex) -> {
+ try {
+ LOG.error("Unable to allocate memory, exiting bookie", ex);
+ } finally {
+ if (uncaughtExceptionHandler != null) {
+ uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), ex);
+ }
+ }
+ })
+ .leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
+ .build();
+ }
+
/**
* Legacy Method to run bookie server.
*/
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 6839afc..d411e98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -29,7 +29,6 @@ import com.google.protobuf.UnsafeByteOperations;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
@@ -166,6 +165,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
final BookieSocketAddress addr;
final EventLoopGroup eventLoopGroup;
+ final ByteBufAllocator allocator;
final OrderedExecutor executor;
final long addEntryTimeoutNanos;
final long readEntryTimeoutNanos;
@@ -345,12 +345,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool) throws SecurityException {
- this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE,
+ this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, addr, NullStatsLogger.INSTANCE,
authProviderFactory, extRegistry, pcbcPool, null);
}
public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor,
- EventLoopGroup eventLoopGroup, BookieSocketAddress addr,
+ EventLoopGroup eventLoopGroup,
+ ByteBufAllocator allocator,
+ BookieSocketAddress addr,
StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory,
ExtensionRegistry extRegistry,
PerChannelBookieClientPool pcbcPool,
@@ -364,6 +366,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
} else {
this.eventLoopGroup = eventLoopGroup;
}
+ this.allocator = allocator;
this.state = ConnectionState.DISCONNECTED;
this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
@@ -376,7 +379,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
this.extRegistry = extRegistry;
this.shFactory = shFactory;
if (shFactory != null) {
- shFactory.init(NodeType.Client, conf);
+ shFactory.init(NodeType.Client, conf, allocator);
}
StringBuilder nameBuilder = new StringBuilder();
@@ -515,14 +518,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
bootstrap.channel(NioSocketChannel.class);
}
- ByteBufAllocator allocator;
- if (this.conf.isNettyUsePooledBuffers()) {
- allocator = PooledByteBufAllocator.DEFAULT;
- } else {
- allocator = UnpooledByteBufAllocator.DEFAULT;
- }
-
- bootstrap.option(ChannelOption.ALLOCATOR, allocator);
+ bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getClientConnectTimeoutMillis());
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
conf.getClientWriteBufferLowWaterMark(), conf.getClientWriteBufferHighWaterMark()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
index d6d1949..59dec3a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java
@@ -22,6 +22,7 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import com.scurrilous.circe.crc.Sse42Crc32C;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
@@ -38,10 +39,10 @@ class CRC32CDigestManager extends DigestManager {
}
};
- public CRC32CDigestManager(long ledgerId, boolean useV2Protocol) {
- super(ledgerId, useV2Protocol);
+ public CRC32CDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+ super(ledgerId, useV2Protocol, allocator);
if (!Sse42Crc32C.isSupported()) {
- log.error("Sse42Crc32C is not supported, will use less slower CRC32C implementation.");
+ log.error("Sse42Crc32C is not supported, will use a slower CRC32C implementation.");
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index b71ab59..d06bc80 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.proto.checksum;
*/
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.FastThreadLocal;
/**
@@ -46,8 +47,8 @@ class CRC32DigestManager extends DigestManager {
}
};
- public CRC32DigestManager(long ledgerId, boolean useV2Protocol) {
- super(ledgerId, useV2Protocol);
+ public CRC32DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+ super(ledgerId, useV2Protocol, allocator);
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index 1928637..2dabf82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -18,7 +18,7 @@
package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
@@ -47,6 +47,7 @@ public abstract class DigestManager {
final long ledgerId;
final boolean useV2Protocol;
+ private final ByteBufAllocator allocator;
abstract int getMacCodeLength();
@@ -60,28 +61,24 @@ public abstract class DigestManager {
final int macCodeLength;
- public DigestManager(long ledgerId, boolean useV2Protocol) {
+ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
this.ledgerId = ledgerId;
this.useV2Protocol = useV2Protocol;
- macCodeLength = getMacCodeLength();
- }
-
- public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType)
- throws GeneralSecurityException {
- return instantiate(ledgerId, passwd, digestType, false);
+ this.macCodeLength = getMacCodeLength();
+ this.allocator = allocator;
}
public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
- boolean useV2Protocol) throws GeneralSecurityException {
+ ByteBufAllocator allocator, boolean useV2Protocol) throws GeneralSecurityException {
switch(digestType) {
case HMAC:
- return new MacDigestManager(ledgerId, passwd, useV2Protocol);
+ return new MacDigestManager(ledgerId, passwd, useV2Protocol, allocator);
case CRC32:
- return new CRC32DigestManager(ledgerId, useV2Protocol);
+ return new CRC32DigestManager(ledgerId, useV2Protocol, allocator);
case CRC32C:
- return new CRC32CDigestManager(ledgerId, useV2Protocol);
+ return new CRC32CDigestManager(ledgerId, useV2Protocol, allocator);
case DUMMY:
- return new DummyDigestManager(ledgerId, useV2Protocol);
+ return new DummyDigestManager(ledgerId, useV2Protocol, allocator);
default:
throw new GeneralSecurityException("Unknown checksum type: " + digestType);
}
@@ -106,7 +103,7 @@ public abstract class DigestManager {
/*
* For V2 protocol, use pooled direct ByteBuf's to avoid object allocation in DigestManager.
*/
- ByteBuf headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(METADATA_LENGTH + macCodeLength);
+ ByteBuf headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(entryId);
headersBuffer.writeLong(lastAddConfirmed);
@@ -149,7 +146,7 @@ public abstract class DigestManager {
public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
ByteBuf headersBuffer;
if (this.useV2Protocol) {
- headersBuffer = PooledByteBufAllocator.DEFAULT.buffer(LAC_METADATA_LENGTH + macCodeLength);
+ headersBuffer = allocator.buffer(LAC_METADATA_LENGTH + macCodeLength);
} else {
headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
}
@@ -185,7 +182,7 @@ public abstract class DigestManager {
int offset = METADATA_LENGTH + macCodeLength;
update(dataReceived.slice(offset, dataReceived.readableBytes() - offset));
- ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+ ByteBuf digest = allocator.buffer(macCodeLength);
populateValueAndReset(digest);
try {
@@ -225,7 +222,7 @@ public abstract class DigestManager {
update(dataReceived.slice(0, LAC_METADATA_LENGTH));
- ByteBuf digest = PooledByteBufAllocator.DEFAULT.buffer(macCodeLength);
+ ByteBuf digest = allocator.buffer(macCodeLength);
try {
populateValueAndReset(digest);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
index 1b771f0..aeb0d5b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java
@@ -21,14 +21,15 @@
package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
/**
* This class provides a noop digest implementation.
*/
public class DummyDigestManager extends DigestManager {
- public DummyDigestManager(long ledgerId, boolean useV2Protocol) {
- super(ledgerId, useV2Protocol);
+ public DummyDigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
+ super(ledgerId, useV2Protocol, allocator);
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
index e71c077..92b93f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto.checksum;
import static com.google.common.base.Charsets.UTF_8;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import java.security.GeneralSecurityException;
import java.security.MessageDigest;
@@ -71,9 +72,9 @@ public class MacDigestManager extends DigestManager {
}
};
- public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol)
+ public MacDigestManager(long ledgerId, byte[] passwd, boolean useV2Protocol, ByteBufAllocator allocator)
throws GeneralSecurityException {
- super(ledgerId, useV2Protocol);
+ super(ledgerId, useV2Protocol, allocator);
this.passwd = Arrays.copyOf(passwd, passwd.length);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
index 59be884..5b43744 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/SecurityHandlerFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.tls;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.SslHandler;
import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -37,7 +38,7 @@ public interface SecurityHandlerFactory {
String getHandlerName();
- void init(NodeType type, AbstractConfiguration conf) throws SecurityException;
+ void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator) throws SecurityException;
SslHandler newTLSHandler();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index 17aea85..32e2426 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -20,7 +20,8 @@ package org.apache.bookkeeper.tls;
import com.google.common.base.Strings;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.netty.buffer.PooledByteBufAllocator;
+
+import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
@@ -79,6 +80,7 @@ public class TLSContextFactory implements SecurityHandlerFactory {
private String[] protocols;
private String[] ciphers;
private SslContext sslContext;
+ private ByteBufAllocator allocator;
private String getPasswordFromFile(String path) throws IOException {
byte[] pwd;
@@ -350,7 +352,9 @@ public class TLSContextFactory implements SecurityHandlerFactory {
}
@Override
- public synchronized void init(NodeType type, AbstractConfiguration conf) throws SecurityException {
+ public synchronized void init(NodeType type, AbstractConfiguration conf, ByteBufAllocator allocator)
+ throws SecurityException {
+ this.allocator = allocator;
final String enabledProtocols;
final String enabledCiphers;
@@ -397,7 +401,7 @@ public class TLSContextFactory implements SecurityHandlerFactory {
@Override
public SslHandler newTLSHandler() {
- SslHandler sslHandler = sslContext.newHandler(PooledByteBufAllocator.DEFAULT);
+ SslHandler sslHandler = sslContext.newHandler(allocator);
if (protocols != null && protocols.length != 0) {
sslHandler.engine().setEnabledProtocols(protocols);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index f9e4cff..355cf3f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -23,7 +23,6 @@ package org.apache.bookkeeper.util;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@@ -307,7 +306,7 @@ public class ByteBufList extends AbstractReferenceCounted {
if (prependSize) {
// Prepend the frame size before writing the buffer list, so that we only have 1 single size
// header
- ByteBuf sizeBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(4, 4);
+ ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
sizeBuffer.writeInt(b.readableBytes());
ctx.write(sizeBuffer, ctx.voidPromise());
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 530969b..7dde6fd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -34,6 +34,8 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -153,7 +155,8 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
// simulating ZooKeeper exception by assigning a closed zk client to bk
BookieServer bkServer = new BookieServer(conf) {
- protected Bookie newBookie(ServerConfiguration conf)
+ @Override
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
throws IOException, KeeperException, InterruptedException,
BookieException {
Bookie bookie = new Bookie(conf);
@@ -708,7 +711,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
}
@Override
- protected Bookie newBookie(ServerConfiguration conf)
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator)
throws IOException, KeeperException, InterruptedException, BookieException {
return new MockBookieWithNoopShutdown(conf, NullStatsLogger.INSTANCE);
}
@@ -717,7 +720,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
class MockBookieWithNoopShutdown extends Bookie {
public MockBookieWithNoopShutdown(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException, BookieException {
- super(conf, statsLogger);
+ super(conf, statsLogger, UnpooledByteBufAllocator.DEFAULT);
}
// making Bookie Shutdown no-op. Ideally for this testcase we need to
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
index 86f3a86..c98663d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@@ -76,8 +78,8 @@ public class BufferedChannelTest {
newLogFile.deleteOnExit();
FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
- BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
- INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+ BufferedChannel logChannel = new BufferedChannel(UnpooledByteBufAllocator.DEFAULT, fileChannel,
+ INTERNAL_BUFFER_WRITE_CAPACITY, INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
ByteBuf dataBuf = generateEntry(byteBufLength);
dataBuf.markReaderIndex();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index bbe9c10..556556e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
@@ -265,7 +266,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
null,
cp,
Checkpointer.NULL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
storage.start();
long startTime = System.currentTimeMillis();
storage.gcThread.enableForceGC();
@@ -616,7 +618,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
Bookie newbookie = new Bookie(newBookieConf);
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
- BookKeeper.DigestType.toProtoDigestType(digestType), baseClientConf.getUseV2WireProtocol());
+ BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+ baseClientConf.getUseV2WireProtocol());
for (long entryId = 0; entryId <= lastAddConfirmed; entryId++) {
ByteBuf readEntryBufWithChecksum = newbookie.readEntry(ledgerId, entryId);
@@ -860,7 +863,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
null,
checkpointSource,
Checkpointer.NULL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
ledgers.add(1L);
ledgers.add(2L);
ledgers.add(3L);
@@ -885,7 +889,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
dirs, dirs, null,
checkpointSource,
Checkpointer.NULL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
storage.start();
for (int i = 0; i < 10; i++) {
if (!log0.exists()) {
@@ -910,7 +915,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
null,
checkpointSource,
Checkpointer.NULL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
storage.getEntry(1, 1); // entry should exist
}
@@ -1021,7 +1027,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
null,
checkpointSource,
Checkpointer.NULL,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
double threshold = 0.1;
// shouldn't throw exception
@@ -1063,7 +1070,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
};
InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
- Checkpointer.NULL, NullStatsLogger.INSTANCE);
+ Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
for (long ledger = 0; ledger <= 10; ledger++) {
ledgers.add(ledger);
@@ -1081,7 +1088,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
storage = new InterleavedLedgerStorage();
storage.initialize(conf, manager, dirs, dirs, null, checkpointSource,
- Checkpointer.NULL, NullStatsLogger.INSTANCE);
+ Checkpointer.NULL, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId()
- storage.gcThread.scannedLogId;
@@ -1158,7 +1165,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
null,
cp,
Checkpointer.NULL,
- stats.getStatsLogger("storage"));
+ stats.getStatsLogger("storage"),
+ UnpooledByteBufAllocator.DEFAULT);
storage.start();
int majorCompactions = stats.getCounter("storage.gc." + MAJOR_COMPACTION_COUNT).get().intValue();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index f5d4edc..0b24db2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
@@ -606,7 +608,8 @@ public class CreateNewLogTest {
conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
- EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+ UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
.getEntryLogManager();
// set same thread executor for entryLoggerAllocator's allocatorExecutor
@@ -731,7 +734,8 @@ public class CreateNewLogTest {
conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
- EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+ EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+ UnpooledByteBufAllocator.DEFAULT);
EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
.getEntryLogManager();
// set same thread executor for entryLoggerAllocator's allocatorExecutor
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 9062261..e1f3582 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -30,6 +30,8 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -963,8 +965,8 @@ public class EntryLogTest {
File tmpFile = File.createTempFile("entrylog", logid + "");
tmpFile.deleteOnExit();
FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel();
- EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(fc, 10, 10, logid, tmpFile,
- servConf.getFlushIntervalInBytes());
+ EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10,
+ logid, tmpFile, servConf.getFlushIntervalInBytes());
return logChannel;
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 70f2a0e..909e646 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
@@ -342,7 +343,8 @@ public class IndexPersistenceMgrTest {
preCreateFileInfoForLedger(ledgerId, headerVersion);
DigestManager digestManager = DigestManager.instantiate(ledgerId, masterKey,
- BookKeeper.DigestType.toProtoDigestType(digestType), getUseV2WireProtocol);
+ BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+ getUseV2WireProtocol);
CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(ledgerId, masterKey);
fileInfo.readHeader();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index 4fde8e7..c12ed91 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -110,7 +112,7 @@ public class InterleavedLedgerStorageTest {
LedgerDirsManager ledgerDirsManager,
EntryLogListener listener,
StatsLogger statsLogger) throws IOException {
- super(conf, ledgerDirsManager, listener, statsLogger);
+ super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
}
void setCheckEntryTestPoint(CheckEntryListener testPoint) throws InterruptedException {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 2321735..a606f9b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.File;
@@ -522,7 +523,8 @@ public class LedgerCacheTest {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger) throws IOException {
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
super.initialize(
conf,
ledgerManager,
@@ -531,7 +533,8 @@ public class LedgerCacheTest {
stateManager,
checkpointSource,
checkpointer,
- statsLogger);
+ statsLogger,
+ allocator);
if (this.memTable instanceof EntryMemTableWithParallelFlusher) {
this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger) {
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index 697d7c0..d028f70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -23,6 +23,8 @@ package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -145,7 +147,8 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
- BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+ BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+ confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacPersistedInJournal = digestManager.verifyDigestAndReturnLac(explicitLacBuf);
assertEquals("explicitLac persisted in journal", (numOfEntries - 1), explicitLacPersistedInJournal);
} else {
@@ -226,7 +229,8 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
- BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
+ BookKeeper.DigestType.toProtoDigestType(digestType), UnpooledByteBufAllocator.DEFAULT,
+ confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacReadFromFileInfo = digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 1), explicitLacReadFromFileInfo);
} else {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
index 645af9c..0b429ad 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.bookie;
*/
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.IOException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,7 +51,7 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener,
StatsLogger statsLogger) throws IOException {
- super(conf, ledgerDirsManager, listener, statsLogger);
+ super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
}
public SlowEntryLogger setAddDelay(long delay) {
@@ -110,10 +113,11 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator)
throws IOException {
super.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
- stateManager, checkpointSource, checkpointer, statsLogger);
+ stateManager, checkpointSource, checkpointer, statsLogger, allocator);
// do not want to add these to config class, reading throw "raw" interface
long getDelay = conf.getLong(PROP_SLOW_STORAGE_GET_DELAY, 0);
long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 44f20e6..48d1038 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -26,6 +26,8 @@ import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -141,7 +143,8 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
null,
checkpointSrc,
checkpointer,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
}
@After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
index 8da2ffa..71658e7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.bookie;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.File;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -152,7 +155,8 @@ public class StateManagerTest extends BookKeeperClusterTestCase {
.setJournalDirName(tmpDir.toString())
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setForceReadOnlyBookie(true);
- ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE);
+ ReadOnlyBookie readOnlyBookie = new ReadOnlyBookie(readOnlyConf, NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
readOnlyBookie.start();
assertTrue(readOnlyBookie.isRunning());
assertTrue(readOnlyBookie.isReadOnly());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index d9fa8cc..707eb81 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -273,7 +275,8 @@ public class TestSyncThread {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator)
throws IOException {
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
index bfd7a4d..5c24633 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
@@ -94,7 +95,7 @@ public class ConversionRollbackTest {
DbLedgerStorage dbStorage = new DbLedgerStorage();
dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
// Insert some ledger & entries in the dbStorage
for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -124,7 +125,7 @@ public class ConversionRollbackTest {
// Verify that interleaved storage index has the same entries
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
Set<Long> ledgers = Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
index b2afe4c..780b8ec 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
@@ -91,7 +92,7 @@ public class ConversionTest {
InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
// Insert some ledger & entries in the interleaved storage
for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -121,11 +122,12 @@ public class ConversionTest {
// Verify that db index has the same entries
DbLedgerStorage dbStorage = new DbLedgerStorage();
dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
interleavedStorage = new InterleavedLedgerStorage();
interleavedStorage.initialize(conf, null, ledgerDirsManager,
- ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+ ledgerDirsManager, null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE,
+ UnpooledByteBufAllocator.DEFAULT);
Set<Long> ledgers = Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 7a45ee7..e5feef3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.File;
@@ -58,10 +59,11 @@ public class DbLedgerStorageWriteCacheTest {
protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
- StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+ StatsLogger statsLogger, ScheduledExecutorService gcExecutor,
+ long writeCacheSize, long readCacheSize)
throws IOException {
return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
- stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize,
+ stateManager, checkpointSource, checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize,
readCacheSize);
}
@@ -69,9 +71,10 @@ public class DbLedgerStorageWriteCacheTest {
public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
- ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+ ByteBufAllocator allocator, ScheduledExecutorService gcExecutor, long writeCacheSize,
+ long readCacheSize) throws IOException {
super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, stateManager, checkpointSource,
- checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+ checkpointer, statsLogger, allocator, gcExecutor, writeCacheSize, readCacheSize);
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
index 629a238..7bdbcd5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.File;
import java.io.IOException;
@@ -92,7 +93,7 @@ public class LocationsIndexRebuildTest {
DbLedgerStorage ledgerStorage = new DbLedgerStorage();
ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
// Insert some ledger & entries in the storage
for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
@@ -122,7 +123,7 @@ public class LocationsIndexRebuildTest {
// Verify that db index has the same entries
ledgerStorage = new DbLedgerStorage();
ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, null, checkpointSource, checkpointer,
- NullStatsLogger.INSTANCE);
+ NullStatsLogger.INSTANCE, UnpooledByteBufAllocator.DEFAULT);
Set<Long> ledgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0L, 1L, 2L, 3L, 4L)), ledgers);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
index 42e5099..337140c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.Test;
@@ -35,7 +36,7 @@ public class ReadCacheTest {
@Test
public void simple() {
- ReadCache cache = new ReadCache(10 * 1024);
+ ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
assertEquals(0, cache.count());
assertEquals(0, cache.size());
@@ -72,7 +73,7 @@ public class ReadCacheTest {
@Test
public void emptyCache() {
- ReadCache cache = new ReadCache(10 * 1024);
+ ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024);
assertEquals(0, cache.count());
assertEquals(0, cache.size());
@@ -84,7 +85,7 @@ public class ReadCacheTest {
@Test
public void multipleSegments() {
// Test with multiple smaller segments
- ReadCache cache = new ReadCache(10 * 1024, 2 * 1024);
+ ReadCache cache = new ReadCache(UnpooledByteBufAllocator.DEFAULT, 10 * 1024, 2 * 1024);
assertEquals(0, cache.count());
assertEquals(0, cache.size());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
index f8b2bba..5726bbb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -26,9 +26,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.nio.charset.Charset;
import java.util.concurrent.BrokenBarrierException;
@@ -46,11 +47,13 @@ import org.junit.Test;
*/
public class WriteCacheTest {
+ private static final ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+
@Test
public void simple() throws Exception {
- WriteCache cache = new WriteCache(10 * 1024);
+ WriteCache cache = new WriteCache(allocator, 10 * 1024);
- ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+ ByteBuf entry1 = allocator.buffer(1024);
ByteBufUtil.writeUtf8(entry1, "entry-1");
entry1.writerIndex(entry1.capacity());
@@ -87,9 +90,9 @@ public class WriteCacheTest {
int entrySize = 1024;
int entriesCount = cacheSize / entrySize;
- WriteCache cache = new WriteCache(cacheSize);
+ WriteCache cache = new WriteCache(allocator, cacheSize);
- ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+ ByteBuf entry = allocator.buffer(entrySize);
entry.writerIndex(entry.capacity());
for (int i = 0; i < entriesCount; i++) {
@@ -125,7 +128,7 @@ public class WriteCacheTest {
@Test
public void testMultipleSegments() {
// Create cache with max size 1Mb and each segment is 16Kb
- WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+ WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
ByteBuf entry = Unpooled.buffer(1024);
entry.writerIndex(entry.capacity());
@@ -142,7 +145,7 @@ public class WriteCacheTest {
@Test
public void testEmptyCache() {
- WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+ WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
assertEquals(0, cache.count());
assertEquals(0, cache.size());
@@ -160,7 +163,7 @@ public class WriteCacheTest {
@Test
public void testMultipleWriters() throws Exception {
// Create cache with max size 1Mb and each segment is 16Kb
- WriteCache cache = new WriteCache(10 * 1024 * 1024, 16 * 1024);
+ WriteCache cache = new WriteCache(allocator, 10 * 1024 * 1024, 16 * 1024);
ExecutorService executor = Executors.newCachedThreadPool();
@@ -220,7 +223,7 @@ public class WriteCacheTest {
@Test
public void testLedgerDeletion() {
- WriteCache cache = new WriteCache(1024 * 1024, 16 * 1024);
+ WriteCache cache = new WriteCache(allocator, 1024 * 1024, 16 * 1024);
ByteBuf entry = Unpooled.buffer(1024);
entry.writerIndex(entry.capacity());
@@ -265,7 +268,7 @@ public class WriteCacheTest {
@Test
public void testWriteReadsInMultipleSegments() {
// Create cache with max size 4 KB and each segment is 128 bytes
- WriteCache cache = new WriteCache(4 * 1024, 128);
+ WriteCache cache = new WriteCache(allocator, 4 * 1024, 128);
for (int i = 0; i < 48; i++) {
boolean inserted = cache.put(1, i, Unpooled.wrappedBuffer(("test-" + i).getBytes()));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 7be404d..cb45ba7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -587,6 +587,7 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void testReadEntryReleaseByteBufs() throws Exception {
ClientConfiguration confWriter = new ClientConfiguration();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index 253f0d9..e1d32af 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@@ -45,7 +47,7 @@ public class BookKeeperTestClient extends BookKeeper {
public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
throws IOException, InterruptedException, BKException {
- super(conf, null, null,
+ super(conf, null, null, new UnpooledByteBufAllocator(false),
statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
null, null, null);
this.statsProvider = statsProvider;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index 68a6846..a6b873e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.security.GeneralSecurityException;
import java.util.function.Function;
@@ -47,7 +48,8 @@ public class ClientUtil {
public static ByteBuf generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data,
int offset, int len) throws GeneralSecurityException {
- DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32);
+ DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], DigestType.CRC32,
+ UnpooledByteBufAllocator.DEFAULT, true);
return ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
Unpooled.wrappedBuffer(data, offset, len)));
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 6d6e4d1..69b5635 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -31,7 +31,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
@@ -210,6 +212,11 @@ public abstract class MockBookKeeperTestCase {
public boolean isClientClosed() {
return bk.isClosed();
}
+
+ @Override
+ public ByteBufAllocator getByteBufAllocator() {
+ return UnpooledByteBufAllocator.DEFAULT;
+ }
};
when(bk.getClientCtx()).thenReturn(clientCtx);
when(bk.getLedgerManager()).thenReturn(ledgerManager);
@@ -241,7 +248,8 @@ public abstract class MockBookKeeperTestCase {
metadata.getPassword(),
org.apache.bookkeeper.client.BookKeeper.DigestType.toProtoDigestType(
org.apache.bookkeeper.client.BookKeeper.DigestType.fromApiDigestType(
- metadata.getDigestType())));
+ metadata.getDigestType())),
+ UnpooledByteBufAllocator.DEFAULT, false);
}
@After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
index f36c008..f0be8d0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockClientContext.java
@@ -22,6 +22,9 @@ package org.apache.bookkeeper.client;
import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.util.function.BooleanSupplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -50,6 +53,7 @@ public class MockClientContext implements ClientContext {
private BookKeeperClientStats clientStats;
private BooleanSupplier isClientClosed;
private MockRegistrationClient regClient;
+ private ByteBufAllocator allocator;
static MockClientContext create() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
@@ -67,6 +71,7 @@ public class MockClientContext implements ClientContext {
.setPlacementPolicy(placementPolicy)
.setRegistrationClient(regClient)
.setBookieClient(new MockBookieClient(scheduler))
+ .setByteBufAllocator(UnpooledByteBufAllocator.DEFAULT)
.setMainWorkerPool(scheduler)
.setScheduler(scheduler)
.setClientStats(BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE))
@@ -83,6 +88,7 @@ public class MockClientContext implements ClientContext {
.setMainWorkerPool(other.getMainWorkerPool())
.setScheduler(other.getScheduler())
.setClientStats(other.getClientStats())
+ .setByteBufAllocator(other.getByteBufAllocator())
.setIsClientClosed(other::isClientClosed);
}
@@ -151,6 +157,11 @@ public class MockClientContext implements ClientContext {
return this;
}
+ public MockClientContext setByteBufAllocator(ByteBufAllocator allocator) {
+ this.allocator = allocator;
+ return this;
+ }
+
private static <T> T maybeSpy(T orig) {
if (Mockito.mockingDetails(orig).isSpy()) {
return orig;
@@ -204,4 +215,8 @@ public class MockClientContext implements ClientContext {
return isClientClosed.getAsBoolean();
}
+ @Override
+ public ByteBufAllocator getByteBufAllocator() {
+ return allocator;
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index a1032a1..e1c203d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -132,7 +134,7 @@ public class ReadLastConfirmedAndEntryOpTest {
when(mockLh.getCurrentEnsemble()).thenReturn(ensemble);
when(mockLh.getLedgerMetadata()).thenReturn(ledgerMetadata);
when(mockLh.getDistributionSchedule()).thenReturn(distributionSchedule);
- digestManager = new DummyDigestManager(LEDGERID, false);
+ digestManager = new DummyDigestManager(LEDGERID, false, UnpooledByteBufAllocator.DEFAULT);
when(mockLh.getDigestManager()).thenReturn(digestManager);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
index 4ec5992..39c615a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
import static org.junit.Assert.assertTrue;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -108,7 +109,8 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase {
// try to get bookie info from the sleeping bookie. It should fail with timeout error
BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(),
bookieToSleep.getPort());
- BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(cConf, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, executor,
+ scheduler, NullStatsLogger.INSTANCE);
long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
index 9073b01..f993e20 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.Enumeration;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class TestBKConfiguration {
confReturn.setGcWaitTime(1000);
confReturn.setDiskUsageThreshold(0.999f);
confReturn.setDiskUsageWarnThreshold(0.99f);
+ confReturn.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
confReturn.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
confReturn.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
setLoopbackInterfaceAndAllowLoopback(confReturn);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 828956d..31bd406 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -30,6 +30,8 @@ import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,6 +50,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -566,7 +569,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger) throws IOException {
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 3aa2b8a..cf9b3dd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,6 +22,8 @@
package org.apache.bookkeeper.meta;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
@@ -29,6 +31,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
+
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -173,7 +176,8 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
StateManager stateManager,
CheckpointSource checkpointSource,
Checkpointer checkpointer,
- StatsLogger statsLogger) throws IOException {
+ StatsLogger statsLogger,
+ ByteBufAllocator allocator) throws IOException {
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
index 5454a70..4b13ebe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import java.lang.reflect.Field;
import java.nio.channels.FileChannel;
import java.util.Enumeration;
@@ -130,7 +132,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
for (int i = 0; i < journals.size(); i++) {
Journal mock = spy(journals.get(i));
when(mock.getBufferedChannelBuilder()).thenReturn((FileChannel fc, int capacity) -> {
- SlowBufferedChannel sbc = new SlowBufferedChannel(fc, capacity);
+ SlowBufferedChannel sbc = new SlowBufferedChannel(UnpooledByteBufAllocator.DEFAULT, fc, capacity);
sbc.setAddDelay(addDelay);
sbc.setGetDelay(getDelay);
sbc.setFlushDelay(flushDelay);
@@ -306,7 +308,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
BookieServer bks = bs.get(bkId);
bks.shutdown();
bks = new BookieServer(bsConfs.get(bkId));
- mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
bks.start();
bs.set(bkId, bks);
@@ -347,7 +349,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
BookieServer bks = bs.get(bkId);
bks.shutdown();
bks = new BookieServer(bsConfs.get(bkId));
- mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
bks.start();
bs.set(bkId, bks);
@@ -392,7 +394,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
BookieServer bks = bs.get(bkId);
bks.shutdown();
bks = new BookieServer(bsConfs.get(bkId));
- mockJournal(bks.bookie, getDelay, addDelay, flushDelay);
+ mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
bks.start();
bs.set(bkId, bks);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 04119d5..2c349a0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -24,6 +24,7 @@ import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.util.Collections;
import java.util.EnumSet;
@@ -109,7 +110,8 @@ public class MockBookieClient implements BookieClient {
}
public void seedEntries(BookieSocketAddress bookie, long ledgerId, long entryId, long lac) throws Exception {
- DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C);
+ DigestManager digestManager = DigestManager.instantiate(ledgerId, new byte[0], DigestType.CRC32C,
+ UnpooledByteBufAllocator.DEFAULT, false);
ByteBuf entry = ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
entryId, lac, 0, Unpooled.buffer(10)));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
index 4ebe01c..4cf8a7c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBookieRequestProcessor.java
@@ -29,6 +29,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import com.google.protobuf.ByteString;
+
+import io.netty.buffer.UnpooledByteBufAllocator;
+
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
@@ -54,7 +57,7 @@ public class TestBookieRequestProcessor {
// long poll threads == read threads
ServerConfiguration conf = new ServerConfiguration();
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
assertSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
}
@@ -62,7 +65,7 @@ public class TestBookieRequestProcessor {
conf = new ServerConfiguration();
conf.setNumReadWorkerThreads(0);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
assertNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
}
@@ -72,7 +75,7 @@ public class TestBookieRequestProcessor {
conf.setNumReadWorkerThreads(2);
conf.setNumLongPollWorkerThreads(2);
try (BookieRequestProcessor processor = new BookieRequestProcessor(
- conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null)) {
+ conf, mock(Bookie.class), NullStatsLogger.INSTANCE, null, UnpooledByteBufAllocator.DEFAULT)) {
assertNotNull(processor.getReadThreadPool());
assertNotNull(processor.getLongPollThreadPool());
assertNotSame(processor.getReadThreadPool(), processor.getLongPollThreadPool());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index b1feef0..97b7488 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -21,10 +21,14 @@
package org.apache.bookkeeper.test;
+
+
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBufAllocator;
+
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -41,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -237,6 +242,7 @@ public abstract class BookKeeperClusterTestCase {
protected void startBKCluster(String metadataServiceUri) throws Exception {
baseConf.setMetadataServiceUri(metadataServiceUri);
baseClientConf.setMetadataServiceUri(metadataServiceUri);
+ baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
if (numBookies > 0) {
bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
}
@@ -303,6 +309,7 @@ public abstract class BookKeeperClusterTestCase {
}
conf.setLedgerDirNames(ledgerDirNames);
conf.setEnableTaskExecutionStats(true);
+ conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
return conf;
}
@@ -677,7 +684,7 @@ public abstract class BookKeeperClusterTestCase {
TestStatsProvider provider = new TestStatsProvider();
BookieServer server = new BookieServer(conf, provider.getStatsLogger("")) {
@Override
- protected Bookie newBookie(ServerConfiguration conf) {
+ protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator) {
return b;
}
};
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index c6cc72b..d38a178 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -157,8 +158,8 @@ public class BookieClientTest {
BookieSocketAddress addr = bs.getLocalAddress();
ResultStruct arc = new ResultStruct();
- BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+ UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
ByteBufList bb = createByteBuffer(1, 1, 1);
bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE, false, WriteFlag.NONE);
synchronized (arc) {
@@ -258,8 +259,8 @@ public class BookieClientTest {
public void testNoLedger() throws Exception {
ResultStruct arc = new ResultStruct();
BookieSocketAddress addr = bs.getLocalAddress();
- BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup, executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), eventLoopGroup,
+ UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc, BookieProtocol.FLAG_NONE);
arc.wait(1000);
@@ -270,8 +271,8 @@ public class BookieClientTest {
@Test
public void testGetBookieInfo() throws IOException, InterruptedException {
BookieSocketAddress addr = bs.getLocalAddress();
- BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(), executor,
- scheduler, NullStatsLogger.INSTANCE);
+ BookieClient bc = new BookieClientImpl(new ClientConfiguration(), new NioEventLoopGroup(),
+ UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE);
long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE;
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 23012dc..41798a6 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -965,3 +965,53 @@ storage.serve.readonly.tables=false
# the cluster controller schedule interval, in milliseconds. default is 30 seconds.
storage.cluster.controller.schedule.interval.ms=30000
+
+
+#############################################################################
+## Netty Allocator Settings
+#############################################################################
+
+# Define the memory pooling policy.
+# Available options are:
+# - PooledDirect: Use Direct memory for all buffers and pool the memory.
+# Direct memory will avoid the overhead of JVM GC and most
+# memory copies when reading and writing to socket channel.
+# Pooling will add memory space overhead due to the fact that
+# there will be fragmentation in the allocator and that threads
+# will keep a portion of memory as thread-local to avoid
+# contention when possible.
+# - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+# This option has the least overhead in terms of memory usage
+# since the memory will be automatically reclaimed by the
+# JVM GC but might impose a performance penalty at high
+# throughput.
+# Default is: PooledDirect
+# allocatorPoolingPolicy=PooledDirect
+
+# Controls the amount of concurrency for the memory pool.
+# Default is to have a number of allocator arenas equals to 2 * CPUS.
+# Decreasing this number will reduce the amount of memory overhead, at the
+# expense of increased allocation contention.
+# allocatorPoolingConcurrency=8
+
+# Define the memory allocator out of memory policy.
+# Available options are:
+# - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+# fallback to allocate an unpooled buffer from JVM heap.
+# This will help absorb memory allocation spikes because the heap
+# allocations will naturally slow down the process and will result
+# if full GC cleanup if the Heap itself is full.
+# - ThrowException: Throw regular OOM exception without taking addition actions.
+# Default is: FallbackToHeap
+# allocatorOutOfMemoryPolicy=FallbackToHeap
+
+# Available options are:
+# - Disabled: No leak detection and no overhead.
+# - Simple: Instruments 1% of the allocated buffer to track for leaks.
+# - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+# stack traces of places where the buffer was used.
+# - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+# stack traces of places where the buffer was used. Introduce very
+# significant overhead.
+# Default is: Disabled
+# allocatorLeakDetectionPolicy=Disabled
diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
index 3995ea8..2319b2e 100644
--- a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
+++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestTypeBenchmark.java
@@ -24,6 +24,7 @@ package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -106,13 +107,13 @@ public class DigestTypeBenchmark {
public void doSetup() throws Exception {
final byte[] password = "password".getBytes("UTF-8");
crc32 = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
- password, DigestType.CRC32);
+ password, DigestType.CRC32, PooledByteBufAllocator.DEFAULT, true);
crc32c = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
- password, DigestType.CRC32C);
+ password, DigestType.CRC32C, PooledByteBufAllocator.DEFAULT, true);
mac = DigestManager.instantiate(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE),
- password, DigestType.HMAC);
+ password, DigestType.HMAC, PooledByteBufAllocator.DEFAULT, true);
digestBuf = Unpooled.buffer(getDigestManager(digest).getMacCodeLength());
diff --git a/shaded/bookkeeper-server-shaded/pom.xml b/shaded/bookkeeper-server-shaded/pom.xml
index a39227b..85c76be 100644
--- a/shaded/bookkeeper-server-shaded/pom.xml
+++ b/shaded/bookkeeper-server-shaded/pom.xml
@@ -66,6 +66,7 @@
<include>com.google.guava:guava</include>
<include>com.google.protobuf:protobuf-java</include>
<include>org.apache.bookkeeper:bookkeeper-common</include>
+ <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
<include>org.apache.bookkeeper:cpu-affinity</include>
<include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
<include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -83,7 +84,7 @@
</configuration>
</execution>
</executions>
- </plugin>
+ </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>license-maven-plugin</artifactId>
diff --git a/shaded/distributedlog-core-shaded/pom.xml b/shaded/distributedlog-core-shaded/pom.xml
index e5ef59f..85ad8f0 100644
--- a/shaded/distributedlog-core-shaded/pom.xml
+++ b/shaded/distributedlog-core-shaded/pom.xml
@@ -87,6 +87,7 @@
<include>net.java.dev.jna:jna</include>
<include>net.jpountz.lz4:lz4</include>
<include>org.apache.bookkeeper:bookkeeper-common</include>
+ <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
<include>org.apache.bookkeeper:cpu-affinity</include>
<include>org.apache.bookkeeper:bookkeeper-tools-framework</include>
<include>org.apache.bookkeeper:bookkeeper-proto</include>
@@ -207,7 +208,7 @@
</configuration>
</execution>
</executions>
- </plugin>
+ </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>license-maven-plugin</artifactId>
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index a42e12e..d83a8f1 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -137,7 +137,7 @@ groups:
default: 8080
- name: Security settings
- params:
+ params:
- param: bookieAuthProviderFactoryClass
description: The bookie authentication provider factory class name. If this is null, no authentication will take place.
default: null
@@ -207,7 +207,7 @@ groups:
5: expanding header to 512 and padding writes to align sector size configured by `journalAlignmentSize`
6: persisting explicitLac is introduced
- By default, it is `6`.
+ By default, it is `6`.
If you'd like to disable persisting ExplicitLac, you can set this config to < `6` and also fileInfoFormatVersionToWrite should be set to 0. If there is mismatch then the serverconfig is considered invalid.
You can disable `padding-writes` by setting journal version back to `4`. This feature is available in 4.5.0 and onward versions.
default: 6
@@ -618,7 +618,7 @@ groups:
- param: ensemblePlacementPolicy
description: |
The ensemble placement policy used for finding bookie for re-replicating entries.
-
+
Options:
- org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
- org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
@@ -678,4 +678,55 @@ groups:
description: The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
default: 5000
-
+- name: Memory allocator settings
+ params:
+ - param: allocatorPoolingPolicy
+ description: |
+ Define the memory pooling policy.
+
+ Available options are:
+ - PooledDirect: Use Direct memory for all buffers and pool the memory.
+ Direct memory will avoid the overhead of JVM GC and most
+ memory copies when reading and writing to socket channel.
+ Pooling will add memory space overhead due to the fact that
+ there will be fragmentation in the allocator and that threads
+ will keep a portion of memory as thread-local to avoid
+ contention when possible.
+ - UnpooledHeap: Allocate memory from JVM heap without any pooling.
+ This option has the least overhead in terms of memory usage
+ since the memory will be automatically reclaimed by the
+ JVM GC but might impose a performance penalty at high
+ throughput.
+ default: PooledDirect
+ - param: allocatorPoolingConcurrency
+ description: |
+ Controls the amount of concurrency for the memory pool.
+ Default is to have a number of allocator arenas equals to 2 * CPUS.
+ Decreasing this number will reduce the amount of memory overhead, at the
+ expense of increased allocation contention.
+ default: 2 * CPUS
+ - param: allocatorOutOfMemoryPolicy
+ description: |
+ Define the memory allocator out of memory policy.
+
+ Available options are:
+ - FallbackToHeap: If it's not possible to allocate a buffer from direct memory,
+ fallback to allocate an unpooled buffer from JVM heap.
+ This will help absorb memory allocation spikes because the heap
+ allocations will naturally slow down the process and will result
+ if full GC cleanup if the Heap itself is full.
+ - ThrowException: Throw regular OOM exception without taking addition actions.
+ default: FallbackToHeap
+ - param: allocatorLeakDetectionPolicy
+ description: |
+ Define the memory allocator leak detection policy.
+
+ Available options are:
+ - Disabled: No leak detection and no overhead.
+ - Simple: Instruments 1% of the allocated buffer to track for leaks.
+ - Advanced: Instruments 1% of the allocated buffer to track for leaks, reporting
+ stack traces of places where the buffer was used.
+ - Paranoid: Instruments 100% of the allocated buffer to track for leaks, reporting
+ stack traces of places where the buffer was used. Introduce very
+ significant overhead.
+ default: Disabled