You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/05 07:26:09 UTC

[pulsar] branch master updated: [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7650612cd86 [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
7650612cd86 is described below

commit 7650612cd86ead01886300f9b31aba6946831736
Author: Jiaqi Shen <50...@users.noreply.github.com>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800

    [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
    
    ### Motivation
    
    There is a incorrect out-of-order check for chunked message in `ConsumerImpl`. For the last check should compare the result of `chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of `ChunkId` with `TotalChunkMsgSize`.
    
    ### Modifications
    
    Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c474da345c2..4cb334b38e4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1433,12 +1433,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
-                || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
-                || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
+                || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
             // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
-                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
-                    msgMetadata.getTotalChunkMsgSize());
+            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId,
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                     ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);