You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/25 12:07:04 UTC
[bookkeeper] branch master updated: Netty allocator wrapper
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 379d2ba Netty allocator wrapper
379d2ba is described below
commit 379d2ba8570eed1f92f8a468158f1a2d0861aaa6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 25 05:06:58 2018 -0700
Netty allocator wrapper
### Motivation
Configuring the correct JVM memory settings and cache sizes for a BookKeeper cluster should be simplified.
There are currently many knobs in Netty or JVM flags for different components and while with a good setup the systems is very stable, it's easy to setup non-optimal configurations which might result in OutOfMemory errors under load.
Ideally, there should be very minimal configuration required to bring up a BookKeeper cluster that can work under a wide set of traffic loads. In any case, we should prefer to automatically fallback to slower alternatives, when possible, instead of throwing OOM exceptions.
* Provide a wrapper to have a single unified configuration point for Netty allocator configuration.
* Provide fallback policy when dealing with direct memory OOM errors
### Changes
* This is the first PR. It only contains the allocator wrapper implementation. Subsequent PR will add the changes to use it.
* Added `bookkeeper-common-allocator` module to have a no-dependencues module that can be used directly from Pulsar client library too (which doesn't depend on BK).
Author: Sijie Guo <gu...@gmail.com>
Author: Charan Reddy Guttapalem <re...@gmail.com>
Author: Matteo Merli <mm...@apache.org>
Author: Andrey Yegorov <dl...@users.noreply.github.com>
Author: Samuel Just <sj...@salesforce.com>
Reviewers: Qi Wang <None>, Sijie Guo <si...@apache.org>, Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
This closes #1754 from merlimat/allocator
---
bookkeeper-common-allocator/pom.xml | 60 +++++
.../common/allocator/ByteBufAllocatorBuilder.java | 97 ++++++++
.../common/allocator/LeakDetectionPolicy.java | 47 ++++
.../common/allocator/OutOfMemoryPolicy.java | 39 +++
.../bookkeeper/common/allocator/PoolingPolicy.java | 45 ++++
.../impl/ByteBufAllocatorBuilderImpl.java | 90 +++++++
.../allocator/impl/ByteBufAllocatorImpl.java | 163 +++++++++++++
.../common/allocator/impl/package-info.java | 21 ++
.../bookkeeper/common/allocator/package-info.java | 21 ++
.../impl/ByteBufAllocatorBuilderTest.java | 270 +++++++++++++++++++++
pom.xml | 1 +
11 files changed, 854 insertions(+)
diff --git a/bookkeeper-common-allocator/pom.xml b/bookkeeper-common-allocator/pom.xml
new file mode 100644
index 0000000..a988899
--- /dev/null
+++ b/bookkeeper-common-allocator/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper</artifactId>
+ <version>4.9.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>bookkeeper-common-allocator</artifactId>
+ <name>Apache BookKeeper :: Common :: Allocator</name>
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
new file mode 100644
index 0000000..d749efd
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+
+/**
+ * Builder object to customize a ByteBuf allocator.
+ */
+public interface ByteBufAllocatorBuilder {
+ /**
+ * Creates a new {@link ByteBufAllocatorBuilder}.
+ */
+ static ByteBufAllocatorBuilder create() {
+ return new ByteBufAllocatorBuilderImpl();
+ }
+
+ /**
+ * Finalize the configured {@link ByteBufAllocator}.
+ */
+ ByteBufAllocator build();
+
+ /**
+ * Specify a custom allocator where the allocation requests should be
+ * forwarded to.
+ *
+ * <p>Default is to use a new instance of {@link PooledByteBufAllocator}.
+ */
+ ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator pooledAllocator);
+
+ /**
+ * Specify a custom allocator where the allocation requests should be
+ * forwarded to.
+ *
+ * <p>Default is to use {@link UnpooledByteBufAllocator#DEFAULT}.
+ */
+ ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator unpooledAllocator);
+
+ /**
+ * Define the memory pooling policy.
+ *
+ * <p>Default is {@link PoolingPolicy#PooledDirect}
+ */
+ ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
+
+ /**
+ * Controls the amount of concurrency for the memory pool.
+ *
+ * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+ *
+ * <p>Decreasing this number will reduce the amount of memory overhead, at the
+ * expense of increased allocation contention.
+ */
+ ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
+
+ /**
+ * Define the OutOfMemory handling policy.
+ *
+ * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+ */
+ ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy);
+
+ /**
+ * Add a listener that is triggered whenever there is an allocation failure.
+ *
+ * <p>Application can use this to trigger alerting or process restarting.
+ */
+ ByteBufAllocatorBuilder outOfMemoryListener(Consumer<OutOfMemoryError> listener);
+
+ /**
+ * Enable the leak detection for the allocator.
+ *
+ * <p>Default is {@link LeakDetectionPolicy#Disabled}
+ */
+ ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
new file mode 100644
index 0000000..4766847
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define the policy for the Netty leak detector.
+ */
+public enum LeakDetectionPolicy {
+
+ /**
+ * No leak detection and no overhead.
+ */
+ Disabled,
+
+ /**
+ * Instruments 1% of the allocated buffer to track for leaks.
+ */
+ Simple,
+
+ /**
+ * Instruments 1% of the allocated buffer to track for leaks, reporting
+ * stack traces of places where the buffer was used.
+ */
+ Advanced,
+
+ /**
+ * Instruments 100% of the allocated buffer to track for leaks, reporting
+ * stack traces of places where the buffer was used. Introduce very
+ * significant overhead.
+ */
+ Paranoid,
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
new file mode 100644
index 0000000..ff72050
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Represents the action to take when it's not possible to allocate memory.
+ */
+public enum OutOfMemoryPolicy {
+
+ /**
+ * Throw regular OOM exception without taking addition actions.
+ */
+ ThrowException,
+
+ /**
+ * If it's not possible to allocate a buffer from direct memory, fallback to
+ * allocate an unpooled buffer from JVM heap.
+ *
+ * <p>This will help absorb memory allocation spikes because the heap
+ * allocations will naturally slow down the process and will result if full
+ * GC cleanup if the Heap itself is full.
+ */
+ FallbackToHeap,
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
new file mode 100644
index 0000000..352a55e
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define a policy for allocating buffers.
+ */
+public enum PoolingPolicy {
+
+ /**
+ * Allocate memory from JVM heap without any pooling.
+ *
+ * <p>This option has the least overhead in terms of memory usage since the
+ * memory will be automatically reclaimed by the JVM GC but might impose a
+ * performance penalty at high throughput.
+ */
+ UnpooledHeap,
+
+ /**
+ * Use Direct memory for all buffers and pool the memory.
+ *
+ * <p>Direct memory will avoid the overhead of JVM GC and most memory copies
+ * when reading and writing to socket channel.
+ *
+ * <p>Pooling will add memory space overhead due to the fact that there will be
+ * fragmentation in the allocator and that threads will keep a portion of
+ * memory as thread-local to avoid contention when possible.
+ */
+ PooledDirect
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
new file mode 100644
index 0000000..fc6bd9d
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.ByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+
+/**
+ * Implementation of {@link ByteBufAllocatorBuilder}.
+ */
+public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
+
+ ByteBufAllocator pooledAllocator = null;
+ ByteBufAllocator unpooledAllocator = null;
+ PoolingPolicy poolingPolicy = PoolingPolicy.PooledDirect;
+ int poolingConcurrency = 2 * Runtime.getRuntime().availableProcessors();
+ OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
+ Consumer<OutOfMemoryError> outOfMemoryListener = null;
+ LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+
+ @Override
+ public ByteBufAllocator build() {
+ return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency,
+ outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator pooledAllocator) {
+ this.pooledAllocator = pooledAllocator;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator unpooledAllocator) {
+ this.unpooledAllocator = unpooledAllocator;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy) {
+ this.poolingPolicy = policy;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency) {
+ this.poolingConcurrency = poolingConcurrency;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy) {
+ this.outOfMemoryPolicy = policy;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder outOfMemoryListener(Consumer<OutOfMemoryError> listener) {
+ this.outOfMemoryListener = listener;
+ return this;
+ }
+
+ @Override
+ public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy) {
+ this.leakDetectionPolicy = leakDetectionPolicy;
+ return this;
+ }
+
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
new file mode 100644
index 0000000..3544165
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ByteBufAllocator}.
+ */
+public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements ByteBufAllocator {
+
+ private static final Logger log = LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
+
+ private final ByteBufAllocator pooledAllocator;
+ private final ByteBufAllocator unpooledAllocator;
+ private final PoolingPolicy poolingPolicy;
+ private final OutOfMemoryPolicy outOfMemoryPolicy;
+ private final Consumer<OutOfMemoryError> outOfMemoryListener;
+
+ ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
+ PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
+ Consumer<OutOfMemoryError> outOfMemoryListener,
+ LeakDetectionPolicy leakDetectionPolicy) {
+ super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+
+ this.poolingPolicy = poolingPolicy;
+ this.outOfMemoryPolicy = outOfMemoryPolicy;
+ if (outOfMemoryListener == null) {
+ this.outOfMemoryListener = (v) -> {
+ log.error("Unable to allocate memory", v);
+ };
+ } else {
+ this.outOfMemoryListener = outOfMemoryListener;
+ }
+
+ if (poolingPolicy == PoolingPolicy.PooledDirect) {
+ if (pooledAllocator == null) {
+ this.pooledAllocator = new PooledByteBufAllocator(
+ true /* preferDirect */,
+ poolingConcurrency /* nHeapArena */,
+ poolingConcurrency /* nDirectArena */,
+ PooledByteBufAllocator.defaultPageSize(),
+ PooledByteBufAllocator.defaultMaxOrder(),
+ PooledByteBufAllocator.defaultTinyCacheSize(),
+ PooledByteBufAllocator.defaultSmallCacheSize(),
+ PooledByteBufAllocator.defaultNormalCacheSize(),
+ PooledByteBufAllocator.defaultUseCacheForAllThreads());
+ } else {
+ this.pooledAllocator = pooledAllocator;
+ }
+ } else {
+ this.pooledAllocator = null;
+ }
+
+ this.unpooledAllocator = (unpooledAllocator != null) ? unpooledAllocator : UnpooledByteBufAllocator.DEFAULT;
+
+ // The setting is static in Netty, so it will actually affect all
+ // allocators
+ switch (leakDetectionPolicy) {
+ case Disabled:
+ if (log.isDebugEnabled()) {
+ log.debug("Disable Netty allocator leak detector");
+ }
+ ResourceLeakDetector.setLevel(Level.DISABLED);
+ break;
+
+ case Simple:
+ log.info("Setting Netty allocator leak detector to Simple");
+ ResourceLeakDetector.setLevel(Level.SIMPLE);
+ break;
+
+ case Advanced:
+ log.info("Setting Netty allocator leak detector to Advanced");
+ ResourceLeakDetector.setLevel(Level.ADVANCED);
+ break;
+
+ case Paranoid:
+ log.info("Setting Netty allocator leak detector to Paranoid");
+ ResourceLeakDetector.setLevel(Level.PARANOID);
+ break;
+ }
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+ try {
+ // There are few cases in which we ask explicitly for a pooled
+ // heap buffer.
+ ByteBufAllocator alloc = (poolingPolicy == PoolingPolicy.PooledDirect) ? pooledAllocator
+ : unpooledAllocator;
+ return alloc.heapBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e) {
+ outOfMemoryListener.accept(e);
+ throw e;
+ }
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+ if (poolingPolicy == PoolingPolicy.PooledDirect) {
+ try {
+ return pooledAllocator.directBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e) {
+ switch (outOfMemoryPolicy) {
+ case ThrowException:
+ outOfMemoryListener.accept(e);
+ throw e;
+
+ case FallbackToHeap:
+ try {
+ return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e2) {
+ outOfMemoryListener.accept(e2);
+ throw e2;
+ }
+ }
+ return null;
+ }
+ } else {
+ // Unpooled heap buffer. Force heap buffers because unpooled direct
+ // buffers have very high overhead of allocation/reclaiming
+ try {
+ return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+ } catch (OutOfMemoryError e) {
+ outOfMemoryListener.accept(e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return pooledAllocator != null && pooledAllocator.isDirectBufferPooled();
+ }
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
new file mode 100644
index 0000000..1013309
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implements the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
new file mode 100644
index 0000000..5129114
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator;
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
new file mode 100644
index 0000000..8ff66c3
--- /dev/null
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ByteBufAllocatorBuilderImpl}.
+ */
+public class ByteBufAllocatorBuilderTest {
+
+ private static final OutOfMemoryError outOfDirectMemException;
+
+ static {
+ try {
+ Class<?> clazz = (Class<?>) ByteBufAllocatorBuilderTest.class.getClassLoader()
+ .loadClass("io.netty.util.internal.OutOfDirectMemoryError");
+ @SuppressWarnings("unchecked")
+ Constructor<OutOfMemoryError> constructor = (Constructor<OutOfMemoryError>) clazz
+ .getDeclaredConstructor(String.class);
+
+ constructor.setAccessible(true);
+ outOfDirectMemException = constructor.newInstance("no mem");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Test
+ public void testOomWithException() {
+ ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+ when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+ AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .pooledAllocator(baseAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+ .outOfMemoryListener((e) -> {
+ receivedException.set(e);
+ })
+ .build();
+
+ try {
+ alloc.buffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(outOfDirectMemException, e);
+ }
+
+ // Ensure the notification was triggered even when exception is thrown
+ assertEquals(outOfDirectMemException, receivedException.get());
+ }
+
+ @Test
+ public void testOomWithFallback() {
+ ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+ when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+ AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .pooledAllocator(baseAlloc)
+ .unpooledAllocator(UnpooledByteBufAllocator.DEFAULT)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+ .outOfMemoryListener((e) -> {
+ receivedException.set(e);
+ })
+ .build();
+
+ // Should not throw exception
+ ByteBuf buf = alloc.buffer();
+ assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+
+ // No notification should have been triggered
+ assertEquals(null, receivedException.get());
+ }
+
+ @Test
+ public void testOomWithFallbackAndNoMoreHeap() {
+ ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+ when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+ ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+ OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+ when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+ AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .pooledAllocator(baseAlloc)
+ .unpooledAllocator(heapAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+ .outOfMemoryListener((e) -> {
+ receivedException.set(e);
+ })
+ .build();
+
+ try {
+ alloc.buffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(noHeapError, e);
+ }
+
+ // Ensure the notification was triggered even when exception is thrown
+ assertEquals(noHeapError, receivedException.get());
+ }
+
+ @Test
+ public void testOomUnpooled() {
+ ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+ OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+ when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+ AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(PoolingPolicy.UnpooledHeap)
+ .unpooledAllocator(heapAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+ .outOfMemoryListener((e) -> {
+ receivedException.set(e);
+ })
+ .build();
+
+ try {
+ alloc.directBuffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(noHeapError, e);
+ }
+
+ // Ensure the notification was triggered even when exception is thrown
+ assertEquals(noHeapError, receivedException.get());
+ }
+
+ @Test
+ public void testOomUnpooledWithHeap() {
+ ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+ OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+ when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+ AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(PoolingPolicy.UnpooledHeap)
+ .unpooledAllocator(heapAlloc)
+ .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+ .outOfMemoryListener((e) -> {
+ receivedException.set(e);
+ })
+ .build();
+
+ try {
+ alloc.heapBuffer();
+ fail("Should have thrown exception");
+ } catch (OutOfMemoryError e) {
+ // Expected
+ assertEquals(noHeapError, e);
+ }
+
+ // Ensure the notification was triggered even when exception is thrown
+ assertEquals(noHeapError, receivedException.get());
+ }
+
+ @Test
+ public void testUnpooled() {
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(PoolingPolicy.UnpooledHeap)
+ .build();
+
+ ByteBuf buf = alloc.buffer();
+ assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+ assertTrue(buf.hasArray());
+
+ ByteBuf buf2 = alloc.directBuffer();
+ assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
+ assertTrue(buf2.hasArray());
+ }
+
+ @Test
+ public void testPooled() {
+ PooledByteBufAllocator pooledAlloc = new PooledByteBufAllocator(true);
+
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(PoolingPolicy.PooledDirect)
+ .pooledAllocator(pooledAlloc)
+ .build();
+
+ assertTrue(alloc.isDirectBufferPooled());
+
+ ByteBuf buf1 = alloc.buffer();
+ assertEquals(pooledAlloc, buf1.alloc());
+ assertFalse(buf1.hasArray());
+ buf1.release();
+
+ ByteBuf buf2 = alloc.directBuffer();
+ assertEquals(pooledAlloc, buf2.alloc());
+ assertFalse(buf2.hasArray());
+ buf2.release();
+
+ ByteBuf buf3 = alloc.heapBuffer();
+ assertEquals(pooledAlloc, buf3.alloc());
+ assertTrue(buf3.hasArray());
+ buf3.release();
+ }
+
+ @Test
+ public void testPooledWithDefaultAllocator() {
+ ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+ .poolingPolicy(PoolingPolicy.PooledDirect)
+ .poolingConcurrency(3)
+ .build();
+
+ assertTrue(alloc.isDirectBufferPooled());
+
+ ByteBuf buf1 = alloc.buffer();
+ assertEquals(PooledByteBufAllocator.class, buf1.alloc().getClass());
+ assertEquals(3, ((PooledByteBufAllocator) buf1.alloc()).metric().numDirectArenas());
+ assertFalse(buf1.hasArray());
+ buf1.release();
+
+ ByteBuf buf2 = alloc.directBuffer();
+ assertFalse(buf2.hasArray());
+ buf2.release();
+
+ ByteBuf buf3 = alloc.heapBuffer();
+ assertTrue(buf3.hasArray());
+ buf3.release();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 4d210a3..c13d487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
<module>buildtools</module>
<module>circe-checksum</module>
<module>bookkeeper-common</module>
+ <module>bookkeeper-common-allocator</module>
<module>bookkeeper-stats</module>
<module>bookkeeper-proto</module>
<module>bookkeeper-server</module>