You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/05 07:26:09 UTC
[pulsar] branch master updated: [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7650612cd86 [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
7650612cd86 is described below
commit 7650612cd86ead01886300f9b31aba6946831736
Author: Jiaqi Shen <50...@users.noreply.github.com>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800
[fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
### Motivation
There is a incorrect out-of-order check for chunked message in `ConsumerImpl`. For the last check should compare the result of `chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of `ChunkId` with `TotalChunkMsgSize`.
### Modifications
Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c474da345c2..4cb334b38e4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1433,12 +1433,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
- || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
- || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
+ || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
// means we lost the first chunk: should never happen
- log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
- (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
- msgMetadata.getTotalChunkMsgSize());
+ log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId,
+ (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);