You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/30 03:22:08 UTC

[pulsar] 14/18: Use PulsarByteBufAllocator to allocate buffer for chunks at consumer side (#13536)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 32c6cdcf8e30d3c3297ad8a8f9b9bd2319464b7d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Dec 28 20:35:21 2021 +0800

    Use PulsarByteBufAllocator to allocate buffer for chunks at consumer side (#13536)
    
    ### Motivation
    
    Currently Pulsar consumer allocates memory from direct memory via `Unpooled.directBuffer` directly, which doesn't make use of the widely used allocator in Pulsar.
    
    ### Modifications
    
    Use `PulsarByteBufAllocator` as the memory allocator for chunks buffer.
    
    (cherry picked from commit a5d347370286cb67ffa4ee179ec6fb5117546a4e)
    (cherry picked from commit ddd1f5eb3dfb98b462a42121027cd5b51ed6218b)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ac04651..9042788 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
@@ -1255,7 +1254,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         if (msgMetadata.getChunkId() == 0) {
-            ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
+            ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
                     msgMetadata.getTotalChunkMsgSize());
             int totalChunks = msgMetadata.getNumChunksFromMsg();
             chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),