You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/20 03:07:43 UTC

[pulsar-client-go] branch master updated: Fix maxMessageSize not effective even if aligned with broker (#381)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 85a9fe8  Fix maxMessageSize not effective even if aligned with broker (#381)
85a9fe8 is described below

commit 85a9fe8e1c5a464ce87551dd9b9768e6ac674d48
Author: wuYin <wu...@gmail.com>
AuthorDate: Tue Oct 20 11:07:33 2020 +0800

    Fix maxMessageSize not effective even if aligned with broker (#381)
    
    ### Motivation
    - issue 1
        If broker updated `maxMessageSize`, client's `maxMessageSize` will be the same after handshaking.
        However, client still use the default `maxMessageSize` while reading command from connection.
        Lead to consumer can't receive message which payload exceed 5MB.
    
    - issue 2
       According to [PIP-36](https://github.com/apache/pulsar/wiki/PIP-36%3A-Max-Message-Size), default *Size should be:
       ```
       maxMessageSize = 5MB
       framePaddingSize = 10KB
       maxFrameSize = maxMessageSize + framePaddingSize
       ```
       But they two are confused currently:
       ```
       maxFrameSize = 5MB
       framePaddingSize = 10KB
       maxMessageSize = maxFrameSize - framePaddingSize
       ```
    
    ### Modifications
    
    - Use the aligned `maxMessageSize` instead of the default value.
    - Correct `maxMessageSize` default value.
    
    
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/internal/commands.go          | 10 +++++-----
 pulsar/internal/connection_reader.go |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index a4d5e5f..1adfba4 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -28,13 +28,13 @@ import (
 )
 
 const (
-	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-	MaxFrameSize = 5 * 1024 * 1024
+	// MaxMessageSize limit message size for transfer
+	MaxMessageSize = 5 * 1024 * 1024
 	// MessageFramePadding is for metadata and other frame headers
 	MessageFramePadding = 10 * 1024
-	// MaxMessageSize limit message size for transfer
-	MaxMessageSize        = MaxFrameSize - MessageFramePadding
-	magicCrc32c    uint16 = 0x0e01
+	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
+	MaxFrameSize        = MaxMessageSize + MessageFramePadding
+	magicCrc32c  uint16 = 0x0e01
 )
 
 // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
diff --git a/pulsar/internal/connection_reader.go b/pulsar/internal/connection_reader.go
index 60c179d..8035324 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -73,7 +73,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
 
 	// We have enough to read frame size
 	frameSize := r.buffer.ReadUint32()
-	if frameSize > MaxFrameSize {
+	if r.cnx.maxMessageSize != 0 && int32(frameSize) > (r.cnx.maxMessageSize+MessageFramePadding) {
 		r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
 		r.cnx.TriggerClose()
 		return nil, nil, errors.New("Frame size too big")