You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/29 15:31:54 UTC

[pulsar] branch master updated: Configure static PulsarByteBufAllocator to handle OOM errors (#4196)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b33c66  Configure static PulsarByteBufAllocator to handle OOM errors (#4196)
3b33c66 is described below

commit 3b33c668e194421aed23873c26e9b7e8bcbd9d9f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 29 08:31:47 2019 -0700

    Configure static PulsarByteBufAllocator to handle OOM errors (#4196)
    
    * Configure static PulsarByteBufAllocator to handle OOM errors
    
    * Always specify `pulsar.allocator.exit_on_oom` when starting pulsar services
    
    * Reverted metrics back
    
    * Fixed compression tests
    
    * Explicitely set the underlying allocator to netty default
    
    * Fixed shading
---
 conf/pulsar_env.sh                                 |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  5 +-
 pom.xml                                            | 11 ++-
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  8 +-
 .../pulsar/broker/BookKeeperClientFactoryImpl.java | 10 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 +-
 .../pulsar/broker/service/BrokerService.java       |  7 +-
 .../pulsar/client/impl/RawBatchConverter.java      |  5 +-
 .../apache/pulsar/client/impl/RawMessageImpl.java  | 10 +--
 .../service/PersistentMessageFinderTest.java       | 10 +--
 pulsar-client-admin-shaded/pom.xml                 |  5 ++
 pulsar-client-all/pom.xml                          |  5 ++
 .../pulsar-client-kafka-shaded/pom.xml             |  5 ++
 pulsar-client-shaded/pom.xml                       |  5 ++
 .../pulsar/client/impl/BatchMessageContainer.java  | 13 ++--
 .../apache/pulsar/client/impl/ConnectionPool.java  | 28 +++----
 .../apache/pulsar/client/impl/MessageCrypto.java   |  6 +-
 pulsar-common/pom.xml                              |  5 ++
 .../common/allocator/PulsarByteBufAllocator.java   | 91 ++++++++++++++++++++++
 .../org/apache/pulsar/common/api/Commands.java     | 10 +--
 .../common/compression/CompressionCodecLZ4.java    | 11 ++-
 .../common/compression/CompressionCodecZLib.java   | 11 +--
 .../common/compression/CompressionCodecZstd.java   |  7 +-
 .../org/apache/pulsar/common/stats/JvmMetrics.java | 23 +++---
 .../apache/pulsar/common/api/ByteBufPairTest.java  | 10 +--
 .../pulsar/common/compression/CommandsTest.java    | 14 ++--
 .../common/compression/CompressorCodecTest.java    | 12 +--
 .../pulsar/discovery/service/DiscoveryService.java | 25 +++---
 .../pulsar/proxy/server/DirectProxyHandler.java    | 39 +++++-----
 .../apache/pulsar/proxy/server/ProxyService.java   |  6 +-
 .../pulsar/testclient/ManagedLedgerWriter.java     |  5 +-
 .../apache/pulsar/websocket/stats/JvmMetrics.java  | 18 -----
 .../impl/BlobStoreBackedInputStreamImpl.java       |  4 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java |  6 +-
 .../impl/BlockAwareSegmentInputStreamImpl.java     |  9 ++-
 .../offload/jcloud/impl/DataBlockHeaderImpl.java   |  6 +-
 .../offload/jcloud/impl/OffloadIndexBlockImpl.java |  8 +-
 37 files changed, 290 insertions(+), 171 deletions(-)

diff --git a/conf/pulsar_env.sh b/conf/pulsar_env.sh
old mode 100644
new mode 100755
index 967b97c..36034b5
--- a/conf/pulsar_env.sh
+++ b/conf/pulsar_env.sh
@@ -48,7 +48,7 @@ PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
 PULSAR_GC=${PULSAR_GC:-"-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB"}
 
 # Extra options to be passed to the jvm
-PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-"-Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}
+PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"}
 
 # Add extra paths to the bookkeeper classpath
 # PULSAR_EXTRA_CLASSPATH=
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 35725e6..0add151 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -34,12 +34,10 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -96,6 +94,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -2173,7 +2172,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         int msgMetadataSize = messageData.getSerializedSize();
         int headersSize = 4 + msgMetadataSize;
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
+        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
         ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
         headers.writeInt(msgMetadataSize);
         messageData.writeTo(outStream);
diff --git a/pom.xml b/pom.xml
index 7ae6b75..7c97e43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,6 +315,12 @@ flexible messaging model and an intuitive client API.</description>
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.bookkeeper</groupId>
+        <artifactId>bookkeeper-common-allocator</artifactId>
+        <version>${bookkeeper.version}</version>
+      </dependency>
+
       <!-- reflection libs -->
       <dependency>
         <groupId>org.reflections</groupId>
@@ -1120,8 +1126,9 @@ flexible messaging model and an intuitive client API.</description>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
-          <argLine> -Xmx2G -XX:MaxDirectMemorySize=8G
-            -Dio.netty.leakDetectionLevel=advanced
+          <argLine> -Xmx2G
+            -Dpulsar.allocator.pooled=false
+            -Dpulsar.allocator.leak_detection=Advanced
             -Dlog4j.configurationFile=log4j2.xml
           </argLine>
           <reuseForks>false</reuseForks>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index b582bb5..a274795 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -39,7 +39,6 @@ import java.util.Optional;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -50,8 +49,8 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.slf4j.Logger;
@@ -315,6 +314,11 @@ public class PulsarBrokerStarter {
             })
         );
 
+        PulsarByteBufAllocator.registerOOMListener(oomException -> {
+            log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException);
+            starter.shutdown();
+        });
+
         try {
             starter.start();
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 2cf5fda..cc1a8e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -22,14 +22,13 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.bookkeeper.common.allocator.PoolingPolicy;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
@@ -40,6 +39,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
     private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
     private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();
 
+    @SuppressWarnings("deprecation")
     @Override
     public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
         ClientConfiguration bkConf = new ClientConfiguration();
@@ -61,7 +61,6 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads());
         bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);
 
-        bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
         if (conf.isBookkeeperClientHealthCheckEnabled()) {
             bkConf.enableBookieHealthCheck();
             bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS);
@@ -109,7 +108,10 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         }
 
         try {
-            return new BookKeeper(bkConf, zkClient);
+            return BookKeeper.forConfig(bkConf)
+                    .allocator(PulsarByteBufAllocator.DEFAULT)
+                    .zk(zkClient)
+                    .build();
         } catch (InterruptedException | BKException e) {
             throw new IOException(e);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cc3d138..767db39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -86,6 +85,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
@@ -101,8 +101,8 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.zookeeper.KeeperException;
@@ -1087,7 +1087,7 @@ public class PersistentTopicsBase extends AdminResource {
             ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
 
             // Copy into a heap buffer for output stream compatibility
-            ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
+            ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
                     uncompressedPayload.readableBytes());
             data.writeBytes(uncompressedPayload);
             uncompressedPayload.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1890efa..775ef70 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Queues;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -42,7 +41,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -88,20 +86,19 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
 import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -278,7 +275,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 pulsar.getConfiguration().getClusterName());
 
         ServerBootstrap bootstrap = new ServerBootstrap();
-        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
         bootstrap.group(acceptorGroup, workerGroup);
         bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
         bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 9009ff9..5c11059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
@@ -33,7 +32,7 @@ import java.util.function.BiPredicate;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -102,7 +101,7 @@ public class RawBatchConverter {
 
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
-        ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
+        ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
 
         CompressionType compressionType = metadata.getCompression();
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
index 72278c9..628e2be 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java
@@ -18,20 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import java.io.IOException;
+
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 public class RawMessageImpl implements RawMessage {
     private static final Logger log = LoggerFactory.getLogger(RawMessageImpl.class);
 
@@ -74,7 +74,7 @@ public class RawMessageImpl implements RawMessage {
         int headerSize = 4 /* IdSize */ + idSize + 4 /* PayloadAndMetadataSize */;
         int totalSize = headerSize + headersAndPayload.readableBytes();
 
-        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(totalSize);
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
         buf.writeInt(idSize);
         try {
             ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index a5a561d..08230c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -22,6 +22,9 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -39,15 +42,12 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.testng.annotations.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-
 /**
  */
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -64,7 +64,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         int payloadSize = data.readableBytes();
         int totalSize = 4 + msgMetadataSize + payloadSize;
 
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
+        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize);
         ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers);
         headers.writeInt(msgMetadataSize);
         messageMetadata.writeTo(outStream);
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index 3e1f95c..d72feac 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -95,6 +95,7 @@
                   <include>io.opencensus:*</include>
                   <include>org.objenesis:*</include>
                   <include>org.yaml:snakeyaml</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -219,6 +220,10 @@
                       <pattern>org.yaml</pattern>
                       <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
                   </relocation>
+                  <relocation>
+                    <pattern>org.apache.bookkeeper</pattern>
+                    <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                  </relocation>
               </relocations>
               <transformers>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 408b61d..44de80f 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -151,6 +151,7 @@
                   <include>org.xerial.snappy:snappy-java</include>
                   <include>org.apache.commons:commons-compress</include>
                   <include>org.tukaani:xz</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                 </includes>
               </artifactSet>
               <filters>
@@ -173,6 +174,10 @@
                   <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
                 </relocation>
                 <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
+                <relocation>
                   <pattern>org.apache.commons</pattern>
                   <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
                 </relocation>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
index 512b334..0429209 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml
@@ -70,6 +70,7 @@
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   <include>org.apache.commons:commons-lang3</include>
                   <include>commons-codec:commons-codec</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>commons-collections:commons-collections</include>
                   <include>org.asynchttpclient:*</include>
                   <include>io.netty:netty-codec-http</include>
@@ -228,6 +229,10 @@
                   <pattern>org.tukaani</pattern>
                   <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
               </relocations>
               <filters>
                 <filter>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 88e59c0..543c601 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -92,6 +92,7 @@
               <artifactSet>
                 <includes>
                   <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
                   <include>org.apache.commons:commons-lang3</include>
                   <include>commons-codec:commons-codec</include>
                   <include>commons-collections:commons-collections</include>
@@ -249,6 +250,10 @@
                   <pattern>org.tukaani</pattern>
                   <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
               </relocations>
               <transformers>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
index 0be5aae..4e49dcf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java
@@ -18,21 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 /**
  * container for individual messages being published until they are batched and sent to broker
  */
@@ -91,7 +90,7 @@ class BatchMessageContainer {
             // the first message
             sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
             this.firstCallback = callback;
-            batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT
+            batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
                     .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES));
         }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 1aa1e43..2b2f822 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,6 +18,18 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.dns.DnsNameResolver;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.util.concurrent.Future;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -32,23 +44,11 @@ import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelException;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.resolver.dns.DnsNameResolver;
-import io.netty.resolver.dns.DnsNameResolverBuilder;
-import io.netty.util.concurrent.Future;
-
 public class ConnectionPool implements Closeable {
     protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
@@ -74,7 +74,7 @@ public class ConnectionPool implements Closeable {
 
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
         bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
-        bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
 
         try {
             bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 1164108..4807508 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -23,7 +23,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 import java.io.IOException;
 import java.io.Reader;
@@ -62,6 +61,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -425,7 +425,7 @@ public class MessageCrypto {
             ByteBuffer sourceNioBuf = payload.nioBuffer(payload.readerIndex(), payload.readableBytes());
 
             int maxLength = cipher.getOutputSize(payload.readableBytes());
-            targetBuf = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+            targetBuf = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
             ByteBuffer targetNioBuf = targetBuf.nioBuffer(0, maxLength);
 
             int bytesStored = cipher.doFinal(sourceNioBuf, targetNioBuf);
@@ -513,7 +513,7 @@ public class MessageCrypto {
             ByteBuffer sourceNioBuf = payload.nioBuffer(payload.readerIndex(), payload.readableBytes());
 
             int maxLength = cipher.getOutputSize(payload.readableBytes());
-            targetBuf = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+            targetBuf = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
             ByteBuffer targetNioBuf = targetBuf.nioBuffer(0, maxLength);
 
             int decryptedSize = cipher.doFinal(sourceNioBuf, targetNioBuf);
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index dbf7d40..5a31b68 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -75,6 +75,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common-allocator</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.lz4</groupId>
       <artifactId>lz4-java</artifactId>
     </dependency>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
new file mode 100644
index 0000000..a324e7f
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.allocator;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
+import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+
+@UtilityClass
+@Slf4j
+public class PulsarByteBufAllocator {
+
+    public static final String PULSAR_ALLOCATOR_POOLED = "pulsar.allocator.pooled";
+    public static final String PULSAR_ALLOCATOR_EXIT_ON_OOM = "pulsar.allocator.exit_on_oom";
+    public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection";
+
+    public static final ByteBufAllocator DEFAULT;
+
+    private static final List<Consumer<OutOfMemoryError>> LISTENERS = new CopyOnWriteArrayList<>();
+
+    public static void registerOOMListener(Consumer<OutOfMemoryError> listener) {
+        LISTENERS.add(listener);
+    }
+
+    private static final boolean EXIT_ON_OOM;
+
+    static {
+        boolean isPooled = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true"));
+        EXIT_ON_OOM = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, "false"));
+
+        LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy
+                .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled"));
+
+        if (log.isDebugEnabled()) {
+            log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, EXIT_ON_OOM);
+        }
+
+        ByteBufAllocatorBuilder builder = ByteBufAllocatorBuilder.create()
+                .leakDetectionPolicy(leakDetectionPolicy)
+                .pooledAllocator(PooledByteBufAllocator.DEFAULT)
+                .outOfMemoryListener(oomException -> {
+                    // First notify all listeners
+                    LISTENERS.forEach(c -> {
+                        try {
+                            c.accept(oomException);
+                        } catch (Throwable t) {
+                            log.warn("Exception during OOM listener: {}", t.getMessage(), t);
+                        }
+                    });
+
+                    if (EXIT_ON_OOM) {
+                        log.info("Exiting JVM process for OOM error: {}", oomException.getMessage(), oomException);
+                        Runtime.getRuntime().halt(1);
+                    }
+                });
+
+        if (isPooled) {
+            builder.poolingPolicy(PoolingPolicy.PooledDirect);
+        } else {
+            builder.poolingPolicy(PoolingPolicy.UnpooledHeap);
+        }
+
+        DEFAULT = builder.build();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index c1c9752..f5f0998 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -27,7 +27,6 @@ import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyF
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
@@ -41,6 +40,7 @@ import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
@@ -980,7 +980,7 @@ public class Commands {
         int totalSize = cmdSize + 4;
         int frameSize = totalSize + 4;
 
-        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize, frameSize);
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(frameSize, frameSize);
 
         // Prepend 2 lengths to the buffer
         buf.writeInt(totalSize);
@@ -1020,7 +1020,7 @@ public class Commands {
         int headersSize = 4 + headerContentSize; // totalSize + headerLength
         int checksumReaderIndex = -1;
 
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
+        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
         headers.writeInt(totalSize); // External frame
 
         try {
@@ -1077,7 +1077,7 @@ public class Commands {
         int checksumReaderIndex = -1;
         int totalSize = headerContentSize + payloadSize;
 
-        ByteBuf metadataAndPayload = PooledByteBufAllocator.DEFAULT.buffer(totalSize, totalSize);
+        ByteBuf metadataAndPayload = PulsarByteBufAllocator.DEFAULT.buffer(totalSize, totalSize);
         try {
             ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metadataAndPayload);
 
@@ -1209,7 +1209,7 @@ public class Commands {
         int totalSize = 4 + cmdSize + metadataAndPayload.readableBytes();
         int headersSize = 4 + 4 + cmdSize;
 
-        ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize);
+        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize);
         headers.writeInt(totalSize); // External frame
 
         try {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index a57992c..417a1b3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.common.compression;
 
+import io.netty.buffer.ByteBuf;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
@@ -54,7 +57,7 @@ public class CompressionCodecLZ4 implements CompressionCodec {
 
         ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
 
-        ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+        ByteBuf target = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
         ByteBuffer targetNio = target.nioBuffer(0, maxLength);
 
         int compressedLength = compressor.compress(sourceNio, 0, uncompressedLength, targetNio, 0, maxLength);
@@ -64,7 +67,7 @@ public class CompressionCodecLZ4 implements CompressionCodec {
 
     @Override
     public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
-        ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
+        ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
         ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
 
         ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
index 5137dc1..da413f3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.common.compression;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
+
 import java.io.IOException;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
  * ZLib Compression
@@ -64,7 +65,7 @@ public class CompressionCodecZLib implements CompressionCodec {
         int length = source.readableBytes();
 
         int sizeEstimate = (int) Math.ceil(source.readableBytes() * 1.001) + 14;
-        ByteBuf compressed = PooledByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate);
+        ByteBuf compressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate);
 
         int offset = 0;
         if (source.hasArray()) {
@@ -98,7 +99,7 @@ public class CompressionCodecZLib implements CompressionCodec {
 
     @Override
     public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
-        ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedLength, uncompressedLength);
+        ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedLength, uncompressedLength);
 
         int len = encoded.readableBytes();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 5566ec1..04c5aa8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -21,11 +21,12 @@ package org.apache.pulsar.common.compression;
 import com.github.luben.zstd.Zstd;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
 /**
  * Zstandard Compression
  */
@@ -38,7 +39,7 @@ public class CompressionCodecZstd implements CompressionCodec {
         int uncompressedLength = source.readableBytes();
         int maxLength = (int) Zstd.compressBound(uncompressedLength);
 
-        ByteBuf target = PooledByteBufAllocator.DEFAULT.directBuffer(maxLength, maxLength);
+        ByteBuf target = PulsarByteBufAllocator.DEFAULT.directBuffer(maxLength, maxLength);
         int compressedLength;
 
         if (source.hasMemoryAddress()) {
@@ -58,7 +59,7 @@ public class CompressionCodecZstd implements CompressionCodec {
 
     @Override
     public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
-        ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.directBuffer(uncompressedLength, uncompressedLength);
+        ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.directBuffer(uncompressedLength, uncompressedLength);
 
         if (encoded.hasMemoryAddress()) {
             Zstd.decompressUnsafe(uncompressed.memoryAddress(), uncompressedLength,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index 5d43fa3..177190a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -18,6 +18,15 @@
  */
 package org.apache.pulsar.common.stats;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.PoolArenaMetric;
+import io.netty.buffer.PoolChunkListMetric;
+import io.netty.buffer.PoolChunkMetric;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.internal.PlatformDependent;
+
 import java.lang.management.BufferPoolMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -36,15 +45,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PoolChunkListMetric;
-import io.netty.buffer.PoolChunkMetric;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.util.internal.PlatformDependent;
-
 public class JvmMetrics {
 
     private static final Logger log = LoggerFactory.getLogger(JvmMetrics.class);
@@ -113,13 +113,14 @@ public class JvmMetrics {
         m.put("jvm_direct_memory_used", getJvmDirectMemoryUsed());
         m.put("jvm_max_direct_memory", PlatformDependent.maxDirectMemory());
         m.put("jvm_thread_cnt", getThreadCount());
-        
+
         this.gcLogger.logMetrics(m);
 
         long totalAllocated = 0;
         long totalUsed = 0;
 
         for (PoolArenaMetric arena : PooledByteBufAllocator.DEFAULT.directArenas()) {
+            this.gcLogger.logMetrics(m);
             for (PoolChunkListMetric list : arena.chunkLists()) {
                 for (PoolChunkMetric chunk : list) {
                     int size = chunk.chunkSize();
@@ -134,6 +135,8 @@ public class JvmMetrics {
         m.put(this.componentName + "_default_pool_allocated", totalAllocated);
         m.put(this.componentName + "_default_pool_used", totalUsed);
 
+        this.gcLogger.logMetrics(m);
+
         return Lists.newArrayList(m);
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
index 92efb4f..bc426a0 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
@@ -23,20 +23,20 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-import org.testng.annotations.Test;
-
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.testng.annotations.Test;
+
 public class ByteBufPairTest {
 
     @Test
     public void testDoubleByteBuf() throws Exception {
-        ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        ByteBuf b1 = PulsarByteBufAllocator.DEFAULT.heapBuffer(128, 128);
         b1.writerIndex(b1.capacity());
-        ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128);
+        ByteBuf b2 = PulsarByteBufAllocator.DEFAULT.heapBuffer(128, 128);
         b2.writerIndex(b2.capacity());
         ByteBufPair buf = ByteBufPair.get(b1, b2);
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 186e7b2..35d213a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -21,8 +21,14 @@ package org.apache.pulsar.common.compression;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import com.scurrilous.circe.checksum.Crc32cIntChecksum;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
 
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.Commands.ChecksumType;
@@ -30,12 +36,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.testng.annotations.Test;
 
-import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 public class CommandsTest {
 
     @Test
@@ -80,7 +80,7 @@ public class CommandsTest {
     private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException {
         int metadataSize = msgMetadata.getSerializedSize();
         int metadataFrameSize = 4 + metadataSize;
-        ByteBuf metaPayloadFrame = PooledByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize);
+        ByteBuf metaPayloadFrame = PulsarByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize);
         ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metaPayloadFrame);
         metaPayloadFrame.writeInt(metadataSize);
         msgMetadata.writeTo(outStream);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index b7e5c85..515bb3c 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -21,16 +21,16 @@ package org.apache.pulsar.common.compression;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
 
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 public class CompressorCodecTest {
 
     private static String text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras id massa odio. Duis commodo ligula sed efficitur cursus. Aliquam sollicitudin, tellus quis suscipit tincidunt, erat sem efficitur nulla, in feugiat diam ex a dolor. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Vestibulum ac volutpat nisl, vel aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper metus. Ut tincidunt et magna non ultrices. Quisqu [...]
@@ -44,7 +44,7 @@ public class CompressorCodecTest {
     void testCompressDecompress(CompressionType type) throws IOException {
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
         byte[] data = text.getBytes();
-        ByteBuf raw = PooledByteBufAllocator.DEFAULT.buffer();
+        ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
         raw.writeBytes(data);
 
         ByteBuf compressed = codec.encode(raw);
@@ -85,7 +85,7 @@ public class CompressorCodecTest {
         byte[] data = text.getBytes();
 
         for (int i = 0; i < 5; i++) {
-            ByteBuf raw = PooledByteBufAllocator.DEFAULT.buffer();
+            ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
             raw.writeBytes(data);
             ByteBuf compressed = codec.encode(raw);
             assertEquals(raw.readableBytes(), data.length);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 828c371..54e0de8 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -20,14 +20,25 @@ package org.apache.pulsar.discovery.service;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Preconditions;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetAddress;
 
+import lombok.Getter;
+
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
@@ -36,16 +47,6 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import lombok.Getter;
-
 /**
  * Main discovery-service which starts component to serve incoming discovery-request over binary-proto channel and
  * redirects to one of the active broker
@@ -79,7 +80,7 @@ public class DiscoveryService implements Closeable {
 
     /**
      * Starts discovery service by initializing zookkeeper and server
-     * 
+     *
      * @throws Exception
      */
     public void start() throws Exception {
@@ -99,7 +100,7 @@ public class DiscoveryService implements Closeable {
     public void startServer() throws Exception {
 
         ServerBootstrap bootstrap = new ServerBootstrap();
-        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
         bootstrap.group(acceptorGroup, workerGroup);
         bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
         bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index a8deb44..6276819 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -22,43 +22,42 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
 import java.net.URI;
 import java.net.URISyntaxException;
-
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import javax.net.ssl.SSLSession;
 
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
-import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelId;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
 public class DirectProxyHandler {
 
     private Channel inboundChannel;
@@ -90,7 +89,7 @@ public class DirectProxyHandler {
         // Tie the backend connection on the same thread to avoid context
         // switches when passing data between the 2
         // connections
-        b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
         b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index a591af1..a634485 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -40,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -111,7 +111,7 @@ public class ProxyService implements Closeable {
         } else {
             this.serviceUrl = null;
         }
-        
+
         if (proxyConfig.getServicePortTls().isPresent()) {
             this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls().get());
         } else {
@@ -137,7 +137,7 @@ public class ProxyService implements Closeable {
         }
 
         ServerBootstrap bootstrap = new ServerBootstrap();
-        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
         bootstrap.group(acceptorGroup, workerGroup);
         bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
         bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index d75ba0a..00bdf03 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -29,8 +29,6 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.text.DecimalFormat;
@@ -61,6 +59,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,7 +151,7 @@ public class ManagedLedgerWriter {
         log.info("Starting Pulsar managed-ledger perf writer with config: {}", w.writeValueAsString(arguments));
 
         byte[] payloadData = new byte[arguments.msgSize];
-        ByteBuf payloadBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize);
+        ByteBuf payloadBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize);
         payloadBuffer.writerIndex(arguments.msgSize);
 
         // Now processing command line arguments
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
index ae7e992..fa0c45f 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java
@@ -82,24 +82,6 @@ public class JvmMetrics {
         m.put("jvm_gc_old_pause", currentOldGcTime);
         m.put("jvm_gc_old_count", currentOldGcCount);
 
-        long totalAllocated = 0;
-        long totalUsed = 0;
-
-        for (PoolArenaMetric arena : PooledByteBufAllocator.DEFAULT.metric().directArenas()) {
-            for (PoolChunkListMetric list : arena.chunkLists()) {
-                for (PoolChunkMetric chunk : list) {
-                    int size = chunk.chunkSize();
-                    int used = size - chunk.freeBytes();
-
-                    totalAllocated += size;
-                    totalUsed += used;
-                }
-            }
-        }
-
-        m.put("proxy_default_pool_allocated", totalAllocated);
-        m.put("proxy_default_pool_used", totalUsed);
-
         return m;
     }
 
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
index 8d0f5bb..d07bab6 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
@@ -19,11 +19,11 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.io.InputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.options.GetOptions;
@@ -52,7 +52,7 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
         this.bucket = bucket;
         this.key = key;
         this.versionCheck = versionCheck;
-        this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
+        this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
         this.objectLen = objectLen;
         this.bufferSize = bufferSize;
         this.cursor = 0;
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 345a750..20ccb3d 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -19,7 +19,7 @@
 package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
 import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.domain.Blob;
 import org.slf4j.Logger;
@@ -116,7 +118,7 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                         long entryId = dataStream.readLong();
 
                         if (entryId == nextExpectedId) {
-                            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
                             entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
                             int toWrite = length;
                             while (toWrite > 0) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
index 253b2e3..ccfb20f 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java
@@ -21,18 +21,21 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.Lists;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,8 +129,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
                 int entryLength = buf.readableBytes();
                 long entryId = entry.getEntryId();
 
-                CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2);
-                ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
+                CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2);
+                ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE);
 
                 entryHeaderBuf.writeInt(entryLength).writeLong(entryId);
                 entryBuf.addComponents(true, entryHeaderBuf, buf);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
index dd6d99b..bb406c4 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java
@@ -22,12 +22,14 @@ import com.google.common.io.CountingInputStream;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
+
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+
 import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 
 /**
  *
@@ -111,7 +113,7 @@ public class DataBlockHeaderImpl implements DataBlockHeader {
      */
     @Override
     public InputStream toStream() {
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
             .writeLong(headerLength)
             .writeLong(blockLength)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
index 7c837fd..ced2bd1 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java
@@ -25,7 +25,6 @@ import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
@@ -42,11 +41,12 @@ import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
+import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
-import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,7 +183,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock {
             + segmentMetadataLength
             + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */
 
-        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+        ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
 
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)