You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/25 12:07:04 UTC

[bookkeeper] branch master updated: Netty allocator wrapper

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 379d2ba  Netty allocator wrapper
379d2ba is described below

commit 379d2ba8570eed1f92f8a468158f1a2d0861aaa6
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 25 05:06:58 2018 -0700

    Netty allocator wrapper
    
    ### Motivation
    
    Configuring the correct JVM memory settings and cache sizes for a BookKeeper cluster should be simplified.
    
    There are currently many knobs in Netty or JVM flags for different components and while with a good setup the systems is very stable, it's easy to setup non-optimal configurations which might result in OutOfMemory errors under load.
    
    Ideally, there should be very minimal configuration required to bring up a BookKeeper cluster that can work under a wide set of traffic loads. In any case, we should prefer to automatically fallback to slower alternatives, when possible, instead of throwing OOM exceptions.
    
     * Provide a wrapper to have a single unified configuration point for Netty allocator configuration.
     * Provide fallback policy when dealing with direct memory OOM errors
    
    ### Changes
    
     * This is the first PR. It only contains the allocator wrapper implementation. Subsequent PR will add the changes to use it.
     * Added `bookkeeper-common-allocator` module to have a no-dependencues module that can be used directly from Pulsar client library too (which doesn't depend on BK).
    
    
    Author: Sijie Guo <gu...@gmail.com>
    Author: Charan Reddy Guttapalem <re...@gmail.com>
    Author: Matteo Merli <mm...@apache.org>
    Author: Andrey Yegorov <dl...@users.noreply.github.com>
    Author: Samuel Just <sj...@salesforce.com>
    
    Reviewers: Qi Wang <None>, Sijie Guo <si...@apache.org>, Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #1754 from merlimat/allocator
---
 bookkeeper-common-allocator/pom.xml                |  60 +++++
 .../common/allocator/ByteBufAllocatorBuilder.java  |  97 ++++++++
 .../common/allocator/LeakDetectionPolicy.java      |  47 ++++
 .../common/allocator/OutOfMemoryPolicy.java        |  39 +++
 .../bookkeeper/common/allocator/PoolingPolicy.java |  45 ++++
 .../impl/ByteBufAllocatorBuilderImpl.java          |  90 +++++++
 .../allocator/impl/ByteBufAllocatorImpl.java       | 163 +++++++++++++
 .../common/allocator/impl/package-info.java        |  21 ++
 .../bookkeeper/common/allocator/package-info.java  |  21 ++
 .../impl/ByteBufAllocatorBuilderTest.java          | 270 +++++++++++++++++++++
 pom.xml                                            |   1 +
 11 files changed, 854 insertions(+)

diff --git a/bookkeeper-common-allocator/pom.xml b/bookkeeper-common-allocator/pom.xml
new file mode 100644
index 0000000..a988899
--- /dev/null
+++ b/bookkeeper-common-allocator/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>bookkeeper</artifactId>
+    <version>4.9.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>bookkeeper-common-allocator</artifactId>
+  <name>Apache BookKeeper :: Common :: Allocator</name>
+  <dependencies>
+     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
new file mode 100644
index 0000000..d749efd
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/ByteBufAllocatorBuilder.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+
+/**
+ * Builder object to customize a ByteBuf allocator.
+ */
+public interface ByteBufAllocatorBuilder {
+    /**
+     * Creates a new {@link ByteBufAllocatorBuilder}.
+     */
+    static ByteBufAllocatorBuilder create() {
+        return new ByteBufAllocatorBuilderImpl();
+    }
+
+    /**
+     * Finalize the configured {@link ByteBufAllocator}.
+     */
+    ByteBufAllocator build();
+
+    /**
+     * Specify a custom allocator where the allocation requests should be
+     * forwarded to.
+     *
+     * <p>Default is to use a new instance of {@link PooledByteBufAllocator}.
+     */
+    ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator pooledAllocator);
+
+    /**
+     * Specify a custom allocator where the allocation requests should be
+     * forwarded to.
+     *
+     * <p>Default is to use {@link UnpooledByteBufAllocator#DEFAULT}.
+     */
+    ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator unpooledAllocator);
+
+    /**
+     * Define the memory pooling policy.
+     *
+     * <p>Default is {@link PoolingPolicy#PooledDirect}
+     */
+    ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy);
+
+    /**
+     * Controls the amount of concurrency for the memory pool.
+     *
+     * <p>Default is to have a number of allocator arenas equals to 2 * CPUS.
+     *
+     * <p>Decreasing this number will reduce the amount of memory overhead, at the
+     * expense of increased allocation contention.
+     */
+    ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency);
+
+    /**
+     * Define the OutOfMemory handling policy.
+     *
+     * <p>Default is {@link OutOfMemoryPolicy#FallbackToHeap}
+     */
+    ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy);
+
+    /**
+     * Add a listener that is triggered whenever there is an allocation failure.
+     *
+     * <p>Application can use this to trigger alerting or process restarting.
+     */
+    ByteBufAllocatorBuilder outOfMemoryListener(Consumer<OutOfMemoryError> listener);
+
+    /**
+     * Enable the leak detection for the allocator.
+     *
+     * <p>Default is {@link LeakDetectionPolicy#Disabled}
+     */
+    ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy);
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
new file mode 100644
index 0000000..4766847
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/LeakDetectionPolicy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define the policy for the Netty leak detector.
+ */
+public enum LeakDetectionPolicy {
+
+    /**
+     * No leak detection and no overhead.
+     */
+    Disabled,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks.
+     */
+    Simple,
+
+    /**
+     * Instruments 1% of the allocated buffer to track for leaks, reporting
+     * stack traces of places where the buffer was used.
+     */
+    Advanced,
+
+    /**
+     * Instruments 100% of the allocated buffer to track for leaks, reporting
+     * stack traces of places where the buffer was used. Introduce very
+     * significant overhead.
+     */
+    Paranoid,
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
new file mode 100644
index 0000000..ff72050
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/OutOfMemoryPolicy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Represents the action to take when it's not possible to allocate memory.
+ */
+public enum OutOfMemoryPolicy {
+
+    /**
+     * Throw regular OOM exception without taking addition actions.
+     */
+    ThrowException,
+
+    /**
+     * If it's not possible to allocate a buffer from direct memory, fallback to
+     * allocate an unpooled buffer from JVM heap.
+     *
+     * <p>This will help absorb memory allocation spikes because the heap
+     * allocations will naturally slow down the process and will result if full
+     * GC cleanup if the Heap itself is full.
+     */
+    FallbackToHeap,
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
new file mode 100644
index 0000000..352a55e
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/PoolingPolicy.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator;
+
+/**
+ * Define a policy for allocating buffers.
+ */
+public enum PoolingPolicy {
+
+    /**
+     * Allocate memory from JVM heap without any pooling.
+     *
+     * <p>This option has the least overhead in terms of memory usage since the
+     * memory will be automatically reclaimed by the JVM GC but might impose a
+     * performance penalty at high throughput.
+     */
+    UnpooledHeap,
+
+    /**
+     * Use Direct memory for all buffers and pool the memory.
+     *
+     * <p>Direct memory will avoid the overhead of JVM GC and most memory copies
+     * when reading and writing to socket channel.
+     *
+     * <p>Pooling will add memory space overhead due to the fact that there will be
+     * fragmentation in the allocator and that threads will keep a portion of
+     * memory as thread-local to avoid contention when possible.
+     */
+    PooledDirect
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
new file mode 100644
index 0000000..fc6bd9d
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.ByteBufAllocator;
+
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+
+/**
+ * Implementation of {@link ByteBufAllocatorBuilder}.
+ */
+public class ByteBufAllocatorBuilderImpl implements ByteBufAllocatorBuilder {
+
+    ByteBufAllocator pooledAllocator = null;
+    ByteBufAllocator unpooledAllocator = null;
+    PoolingPolicy poolingPolicy = PoolingPolicy.PooledDirect;
+    int poolingConcurrency = 2 * Runtime.getRuntime().availableProcessors();
+    OutOfMemoryPolicy outOfMemoryPolicy = OutOfMemoryPolicy.FallbackToHeap;
+    Consumer<OutOfMemoryError> outOfMemoryListener = null;
+    LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy.Disabled;
+
+    @Override
+    public ByteBufAllocator build() {
+        return new ByteBufAllocatorImpl(pooledAllocator, unpooledAllocator, poolingPolicy, poolingConcurrency,
+                outOfMemoryPolicy, outOfMemoryListener, leakDetectionPolicy);
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder pooledAllocator(ByteBufAllocator pooledAllocator) {
+        this.pooledAllocator = pooledAllocator;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder unpooledAllocator(ByteBufAllocator unpooledAllocator) {
+        this.unpooledAllocator = unpooledAllocator;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder poolingPolicy(PoolingPolicy policy) {
+        this.poolingPolicy = policy;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder poolingConcurrency(int poolingConcurrency) {
+        this.poolingConcurrency = poolingConcurrency;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder outOfMemoryPolicy(OutOfMemoryPolicy policy) {
+        this.outOfMemoryPolicy = policy;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder outOfMemoryListener(Consumer<OutOfMemoryError> listener) {
+        this.outOfMemoryListener = listener;
+        return this;
+    }
+
+    @Override
+    public ByteBufAllocatorBuilder leakDetectionPolicy(LeakDetectionPolicy leakDetectionPolicy) {
+        this.leakDetectionPolicy = leakDetectionPolicy;
+        return this;
+    }
+
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
new file mode 100644
index 0000000..3544165
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ByteBufAllocator}.
+ */
+public class ByteBufAllocatorImpl extends AbstractByteBufAllocator implements ByteBufAllocator {
+
+    private static final Logger log = LoggerFactory.getLogger(ByteBufAllocatorImpl.class);
+
+    private final ByteBufAllocator pooledAllocator;
+    private final ByteBufAllocator unpooledAllocator;
+    private final PoolingPolicy poolingPolicy;
+    private final OutOfMemoryPolicy outOfMemoryPolicy;
+    private final Consumer<OutOfMemoryError> outOfMemoryListener;
+
+    ByteBufAllocatorImpl(ByteBufAllocator pooledAllocator, ByteBufAllocator unpooledAllocator,
+            PoolingPolicy poolingPolicy, int poolingConcurrency, OutOfMemoryPolicy outOfMemoryPolicy,
+            Consumer<OutOfMemoryError> outOfMemoryListener,
+            LeakDetectionPolicy leakDetectionPolicy) {
+        super(poolingPolicy == PoolingPolicy.PooledDirect /* preferDirect */);
+
+        this.poolingPolicy = poolingPolicy;
+        this.outOfMemoryPolicy = outOfMemoryPolicy;
+        if (outOfMemoryListener == null) {
+            this.outOfMemoryListener = (v) -> {
+                log.error("Unable to allocate memory", v);
+            };
+        } else {
+            this.outOfMemoryListener = outOfMemoryListener;
+        }
+
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            if (pooledAllocator == null) {
+                this.pooledAllocator = new PooledByteBufAllocator(
+                        true /* preferDirect */,
+                        poolingConcurrency /* nHeapArena */,
+                        poolingConcurrency /* nDirectArena */,
+                        PooledByteBufAllocator.defaultPageSize(),
+                        PooledByteBufAllocator.defaultMaxOrder(),
+                        PooledByteBufAllocator.defaultTinyCacheSize(),
+                        PooledByteBufAllocator.defaultSmallCacheSize(),
+                        PooledByteBufAllocator.defaultNormalCacheSize(),
+                        PooledByteBufAllocator.defaultUseCacheForAllThreads());
+            } else {
+                this.pooledAllocator = pooledAllocator;
+            }
+        } else {
+            this.pooledAllocator = null;
+        }
+
+        this.unpooledAllocator = (unpooledAllocator != null) ? unpooledAllocator : UnpooledByteBufAllocator.DEFAULT;
+
+        // The setting is static in Netty, so it will actually affect all
+        // allocators
+        switch (leakDetectionPolicy) {
+        case Disabled:
+            if (log.isDebugEnabled()) {
+                log.debug("Disable Netty allocator leak detector");
+            }
+            ResourceLeakDetector.setLevel(Level.DISABLED);
+            break;
+
+        case Simple:
+            log.info("Setting Netty allocator leak detector to Simple");
+            ResourceLeakDetector.setLevel(Level.SIMPLE);
+            break;
+
+        case Advanced:
+            log.info("Setting Netty allocator leak detector to Advanced");
+            ResourceLeakDetector.setLevel(Level.ADVANCED);
+            break;
+
+        case Paranoid:
+            log.info("Setting Netty allocator leak detector to Paranoid");
+            ResourceLeakDetector.setLevel(Level.PARANOID);
+            break;
+        }
+    }
+
+    @Override
+    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
+        try {
+            // There are few cases in which we ask explicitly for a pooled
+            // heap buffer.
+            ByteBufAllocator alloc = (poolingPolicy == PoolingPolicy.PooledDirect) ? pooledAllocator
+                    : unpooledAllocator;
+            return alloc.heapBuffer(initialCapacity, maxCapacity);
+        } catch (OutOfMemoryError e) {
+            outOfMemoryListener.accept(e);
+            throw e;
+        }
+    }
+
+    @Override
+    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
+        if (poolingPolicy == PoolingPolicy.PooledDirect) {
+            try {
+                return pooledAllocator.directBuffer(initialCapacity, maxCapacity);
+            } catch (OutOfMemoryError e) {
+                switch (outOfMemoryPolicy) {
+                case ThrowException:
+                    outOfMemoryListener.accept(e);
+                    throw e;
+
+                case FallbackToHeap:
+                    try {
+                        return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+                    } catch (OutOfMemoryError e2) {
+                        outOfMemoryListener.accept(e2);
+                        throw e2;
+                    }
+                }
+                return null;
+            }
+        } else {
+            // Unpooled heap buffer. Force heap buffers because unpooled direct
+            // buffers have very high overhead of allocation/reclaiming
+            try {
+                return unpooledAllocator.heapBuffer(initialCapacity, maxCapacity);
+            } catch (OutOfMemoryError e) {
+                outOfMemoryListener.accept(e);
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public boolean isDirectBufferPooled() {
+        return pooledAllocator != null && pooledAllocator.isDirectBufferPooled();
+    }
+}
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
new file mode 100644
index 0000000..1013309
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/impl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implements the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
new file mode 100644
index 0000000..5129114
--- /dev/null
+++ b/bookkeeper-common-allocator/src/main/java/org/apache/bookkeeper/common/allocator/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities for allocator used across the project.
+ */
+package org.apache.bookkeeper.common.allocator;
\ No newline at end of file
diff --git a/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
new file mode 100644
index 0000000..8ff66c3
--- /dev/null
+++ b/bookkeeper-common-allocator/src/test/java/org/apache/bookkeeper/common/allocator/impl/ByteBufAllocatorBuilderTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.allocator.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ByteBufAllocatorBuilderImpl}.
+ */
+public class ByteBufAllocatorBuilderTest {
+
+    private static final OutOfMemoryError outOfDirectMemException;
+
+    static {
+        try {
+            Class<?> clazz = (Class<?>) ByteBufAllocatorBuilderTest.class.getClassLoader()
+                    .loadClass("io.netty.util.internal.OutOfDirectMemoryError");
+            @SuppressWarnings("unchecked")
+            Constructor<OutOfMemoryError> constructor = (Constructor<OutOfMemoryError>) clazz
+                    .getDeclaredConstructor(String.class);
+
+            constructor.setAccessible(true);
+            outOfDirectMemException = constructor.newInstance("no mem");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    @Test
+    public void testOomWithException() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+        AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.ThrowException)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.buffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(outOfDirectMemException, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(outOfDirectMemException, receivedException.get());
+    }
+
+    @Test
+    public void testOomWithFallback() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+        AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .unpooledAllocator(UnpooledByteBufAllocator.DEFAULT)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        // Should not throw exception
+        ByteBuf buf = alloc.buffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+
+        // No notification should have been triggered
+        assertEquals(null, receivedException.get());
+    }
+
+    @Test
+    public void testOomWithFallbackAndNoMoreHeap() {
+        ByteBufAllocator baseAlloc = mock(ByteBufAllocator.class);
+        when(baseAlloc.directBuffer(anyInt(), anyInt())).thenThrow(outOfDirectMemException);
+
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .pooledAllocator(baseAlloc)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.buffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testOomUnpooled() {
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.directBuffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testOomUnpooledWithHeap() {
+        ByteBufAllocator heapAlloc = mock(ByteBufAllocator.class);
+        OutOfMemoryError noHeapError = new OutOfMemoryError("no more heap");
+        when(heapAlloc.heapBuffer(anyInt(), anyInt())).thenThrow(noHeapError);
+
+        AtomicReference<OutOfMemoryError> receivedException = new AtomicReference<>();
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .unpooledAllocator(heapAlloc)
+                .outOfMemoryPolicy(OutOfMemoryPolicy.FallbackToHeap)
+                .outOfMemoryListener((e) -> {
+                    receivedException.set(e);
+                })
+                .build();
+
+        try {
+            alloc.heapBuffer();
+            fail("Should have thrown exception");
+        } catch (OutOfMemoryError e) {
+            // Expected
+            assertEquals(noHeapError, e);
+        }
+
+        // Ensure the notification was triggered even when exception is thrown
+        assertEquals(noHeapError, receivedException.get());
+    }
+
+    @Test
+    public void testUnpooled() {
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.UnpooledHeap)
+                .build();
+
+        ByteBuf buf = alloc.buffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf.alloc());
+        assertTrue(buf.hasArray());
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertEquals(UnpooledByteBufAllocator.DEFAULT, buf2.alloc());
+        assertTrue(buf2.hasArray());
+    }
+
+    @Test
+    public void testPooled() {
+        PooledByteBufAllocator pooledAlloc = new PooledByteBufAllocator(true);
+
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.PooledDirect)
+                .pooledAllocator(pooledAlloc)
+                .build();
+
+        assertTrue(alloc.isDirectBufferPooled());
+
+        ByteBuf buf1 = alloc.buffer();
+        assertEquals(pooledAlloc, buf1.alloc());
+        assertFalse(buf1.hasArray());
+        buf1.release();
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertEquals(pooledAlloc, buf2.alloc());
+        assertFalse(buf2.hasArray());
+        buf2.release();
+
+        ByteBuf buf3 = alloc.heapBuffer();
+        assertEquals(pooledAlloc, buf3.alloc());
+        assertTrue(buf3.hasArray());
+        buf3.release();
+    }
+
+    @Test
+    public void testPooledWithDefaultAllocator() {
+        ByteBufAllocator alloc = ByteBufAllocatorBuilder.create()
+                .poolingPolicy(PoolingPolicy.PooledDirect)
+                .poolingConcurrency(3)
+                .build();
+
+        assertTrue(alloc.isDirectBufferPooled());
+
+        ByteBuf buf1 = alloc.buffer();
+        assertEquals(PooledByteBufAllocator.class, buf1.alloc().getClass());
+        assertEquals(3, ((PooledByteBufAllocator) buf1.alloc()).metric().numDirectArenas());
+        assertFalse(buf1.hasArray());
+        buf1.release();
+
+        ByteBuf buf2 = alloc.directBuffer();
+        assertFalse(buf2.hasArray());
+        buf2.release();
+
+        ByteBuf buf3 = alloc.heapBuffer();
+        assertTrue(buf3.hasArray());
+        buf3.release();
+    }
+}
diff --git a/pom.xml b/pom.xml
index 4d210a3..c13d487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
     <module>buildtools</module>
     <module>circe-checksum</module>
     <module>bookkeeper-common</module>
+    <module>bookkeeper-common-allocator</module>
     <module>bookkeeper-stats</module>
     <module>bookkeeper-proto</module>
     <module>bookkeeper-server</module>