You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/28 01:47:35 UTC

[pulsar-client-go] branch master updated: [Issue 240] Add check for max message size (#263)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bf248fd  [Issue 240] Add check for max message size (#263)
bf248fd is described below

commit bf248fd43e178f9b598ece50cbdebd6708175b94
Author: Keith Null <20...@users.noreply.github.com>
AuthorDate: Thu May 28 09:47:25 2020 +0800

    [Issue 240] Add check for max message size (#263)
    
    * Add check for max message size
    
    1. When creating a connection, try to get maxMessageSize from handshake
    response command. If it's not set, then use the default maxMessageSize
    value defined in the client side.
    2. When sending a message, check whether the size of payload exceeds
    maxMessageSize. If so, return error immediately without adding this
    meesage into sending queue.
    3. To implement these, I made some tiny modifications in Connection
    interface and added a field in its implementation struct.
    
    * Add testing for max message size
    
    * Fix error log
---
 integration-tests/standalone.conf |  3 +++
 pulsar/internal/batch_builder.go  |  2 --
 pulsar/internal/commands.go       |  8 ++++++--
 pulsar/internal/connection.go     | 15 ++++++++++++++-
 pulsar/producer_partition.go      | 15 ++++++++++++++-
 pulsar/producer_test.go           | 27 +++++++++++++++++++++++++++
 6 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/integration-tests/standalone.conf b/integration-tests/standalone.conf
index d81f493..3fdf53a 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -83,6 +83,9 @@ statusFilePath=/usr/local/apache/htdocs
 # Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
 maxUnackedMessagesPerConsumer=50000
 
+# Set maxMessageSize to 1MB rather than the default value 5MB for testing
+maxMessageSize=1048576
+
 ### --- Authentication --- ###
 
 # Enable TLS
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 80d8a00..3b54eba 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,8 +29,6 @@ import (
 )
 
 const (
-	// MaxMessageSize limit message size for transfer
-	MaxMessageSize = 5 * 1024 * 1024
 	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
 	// This is used as a baseline to allocate a new buffer that can hold the entire batch
 	// without needing costly re-allocations.
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index d9f2a1f..1ee3517 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -30,8 +30,12 @@ import (
 
 const (
 	// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
-	MaxFrameSize        = 5 * 1024 * 1024
-	magicCrc32c  uint16 = 0x0e01
+	MaxFrameSize = 5 * 1024 * 1024
+	// MessageFramePadding is for metadata and other frame headers
+	MessageFramePadding = 10 * 1024
+	// MaxMessageSize limit message size for transfer
+	MaxMessageSize        = MaxFrameSize - MessageFramePadding
+	magicCrc32c    uint16 = 0x0e01
 )
 
 // ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d02deda..797b27a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -71,6 +71,7 @@ type Connection interface {
 	AddConsumeHandler(id uint64, handler ConsumerHandler)
 	DeleteConsumeHandler(id uint64)
 	ID() string
+	GetMaxMessageSize() int32
 	Close()
 }
 
@@ -157,6 +158,8 @@ type connection struct {
 
 	tlsOptions *TLSOptions
 	auth       auth.Provider
+
+	maxMessageSize int32
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool {
 			cmd.Type)
 		return false
 	}
-
+	if cmd.Connected.MaxMessageSize != nil {
+		c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
+		c.maxMessageSize = *cmd.Connected.MaxMessageSize
+	} else {
+		c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
+		c.maxMessageSize = MaxMessageSize
+	}
 	c.log.Info("Connection is ready")
 	c.changeState(connectionReady)
 	return true
@@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) {
 func (c *connection) ID() string {
 	return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
 }
+
+func (c *connection) GetMaxMessageSize() int32 {
+	return c.maxMessageSize
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 63f7bf4..2eb3696 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -40,7 +40,10 @@ const (
 	producerClosed
 )
 
-var errFailAddBatch = errors.New("message send failed")
+var (
+	errFailAddBatch    = errors.New("message send failed")
+	errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+)
 
 type partitionProducer struct {
 	state  int32
@@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 
 	msg := request.msg
 
+	// if msg is too large
+	if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
+		p.publishSemaphore.Release()
+		request.callback(nil, request.msg, errMessageTooLarge)
+		p.log.WithField("size", len(msg.Payload)).
+			WithField("properties", msg.Properties).
+			WithError(errMessageTooLarge).Error()
+		return
+	}
+
 	deliverAt := msg.DeliverAt
 	if msg.DeliverAfter.Nanoseconds() > 0 {
 		deliverAt = time.Now().Add(msg.DeliverAfter)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2910427..9f76873 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -790,3 +790,30 @@ func TestDelayAbsolute(t *testing.T) {
 	assert.NotNil(t, msg)
 	canc()
 }
+
+func TestMaxMessageSize(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+	serverMaxMessageSize := 1024 * 1024
+	for bias := -1; bias <= 1; bias++ {
+		payload := make([]byte, serverMaxMessageSize+bias)
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: payload,
+		})
+		if bias <= 0 {
+			assert.NoError(t, err)
+			assert.NotNil(t, ID)
+		} else {
+			assert.Equal(t, errMessageTooLarge, err)
+		}
+	}
+}