You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/28 01:47:35 UTC
[pulsar-client-go] branch master updated: [Issue 240] Add check for
max message size (#263)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new bf248fd [Issue 240] Add check for max message size (#263)
bf248fd is described below
commit bf248fd43e178f9b598ece50cbdebd6708175b94
Author: Keith Null <20...@users.noreply.github.com>
AuthorDate: Thu May 28 09:47:25 2020 +0800
[Issue 240] Add check for max message size (#263)
* Add check for max message size
1. When creating a connection, try to get maxMessageSize from handshake
response command. If it's not set, then use the default maxMessageSize
value defined in the client side.
2. When sending a message, check whether the size of payload exceeds
maxMessageSize. If so, return error immediately without adding this
meesage into sending queue.
3. To implement these, I made some tiny modifications in Connection
interface and added a field in its implementation struct.
* Add testing for max message size
* Fix error log
---
integration-tests/standalone.conf | 3 +++
pulsar/internal/batch_builder.go | 2 --
pulsar/internal/commands.go | 8 ++++++--
pulsar/internal/connection.go | 15 ++++++++++++++-
pulsar/producer_partition.go | 15 ++++++++++++++-
pulsar/producer_test.go | 27 +++++++++++++++++++++++++++
6 files changed, 64 insertions(+), 6 deletions(-)
diff --git a/integration-tests/standalone.conf b/integration-tests/standalone.conf
index d81f493..3fdf53a 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -83,6 +83,9 @@ statusFilePath=/usr/local/apache/htdocs
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000
+# Set maxMessageSize to 1MB rather than the default value 5MB for testing
+maxMessageSize=1048576
+
### --- Authentication --- ###
# Enable TLS
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 80d8a00..3b54eba 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,8 +29,6 @@ import (
)
const (
- // MaxMessageSize limit message size for transfer
- MaxMessageSize = 5 * 1024 * 1024
// MaxBatchSize will be the largest size for a batch sent from this particular producer.
// This is used as a baseline to allocate a new buffer that can hold the entire batch
// without needing costly re-allocations.
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index d9f2a1f..1ee3517 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -30,8 +30,12 @@ import (
const (
// MaxFrameSize limit the maximum size that pulsar allows for messages to be sent.
- MaxFrameSize = 5 * 1024 * 1024
- magicCrc32c uint16 = 0x0e01
+ MaxFrameSize = 5 * 1024 * 1024
+ // MessageFramePadding is for metadata and other frame headers
+ MessageFramePadding = 10 * 1024
+ // MaxMessageSize limit message size for transfer
+ MaxMessageSize = MaxFrameSize - MessageFramePadding
+ magicCrc32c uint16 = 0x0e01
)
// ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data.
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d02deda..797b27a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -71,6 +71,7 @@ type Connection interface {
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
ID() string
+ GetMaxMessageSize() int32
Close()
}
@@ -157,6 +158,8 @@ type connection struct {
tlsOptions *TLSOptions
auth auth.Provider
+
+ maxMessageSize int32
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -282,7 +285,13 @@ func (c *connection) doHandshake() bool {
cmd.Type)
return false
}
-
+ if cmd.Connected.MaxMessageSize != nil {
+ c.log.Debug("Got MaxMessageSize from handshake response:", *cmd.Connected.MaxMessageSize)
+ c.maxMessageSize = *cmd.Connected.MaxMessageSize
+ } else {
+ c.log.Debug("No MaxMessageSize from handshake response, use default: ", MaxMessageSize)
+ c.maxMessageSize = MaxMessageSize
+ }
c.log.Info("Connection is ready")
c.changeState(connectionReady)
return true
@@ -749,3 +758,7 @@ func (c *connection) consumerHandler(id uint64) (ConsumerHandler, bool) {
func (c *connection) ID() string {
return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
}
+
+func (c *connection) GetMaxMessageSize() int32 {
+ return c.maxMessageSize
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 63f7bf4..2eb3696 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -40,7 +40,10 @@ const (
producerClosed
)
-var errFailAddBatch = errors.New("message send failed")
+var (
+ errFailAddBatch = errors.New("message send failed")
+ errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+)
type partitionProducer struct {
state int32
@@ -236,6 +239,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg := request.msg
+ // if msg is too large
+ if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
+ p.publishSemaphore.Release()
+ request.callback(nil, request.msg, errMessageTooLarge)
+ p.log.WithField("size", len(msg.Payload)).
+ WithField("properties", msg.Properties).
+ WithError(errMessageTooLarge).Error()
+ return
+ }
+
deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2910427..9f76873 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -790,3 +790,30 @@ func TestDelayAbsolute(t *testing.T) {
assert.NotNil(t, msg)
canc()
}
+
+func TestMaxMessageSize(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+ serverMaxMessageSize := 1024 * 1024
+ for bias := -1; bias <= 1; bias++ {
+ payload := make([]byte, serverMaxMessageSize+bias)
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: payload,
+ })
+ if bias <= 0 {
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ } else {
+ assert.Equal(t, errMessageTooLarge, err)
+ }
+ }
+}