You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/12/22 06:22:24 UTC
[pulsar-client-go] branch master updated: [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2bbfb8e [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
2bbfb8e is described below
commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7
Author: Ben Schofield <bs...@users.noreply.github.com>
AuthorDate: Wed Dec 22 06:22:18 2021 +0000
[Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
* Correct apparent logic error in batchContainer's hasSpace() method.
* Make the same change to keyBasedBatchContainer's hasSpace() method.
* Fix comment length to pass style checks.
* Allow TestMaxMessageSize() to run
* Add TestMaxBatchSize() to validate that this limit is respected. Based on the results of the test, change the < in hasSpace() to be a <=.
* Further correct logic in hasSpace() / IsFull()
* Remember to make the change in both places!
Co-authored-by: ben <be...@cyber.casa>
---
pulsar/internal/batch_builder.go | 7 ++---
pulsar/internal/key_based_batch_builder.go | 7 ++---
pulsar/producer_test.go | 42 ++++++++++++++++++++++++++++--
3 files changed, 48 insertions(+), 8 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 7e47304..9d18f26 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -150,14 +150,15 @@ func NewBatchBuilder(
return &bc, nil
}
-// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
func (bc *batchContainer) IsFull() bool {
- return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize)
+ return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
}
+// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
- return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+ return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
}
// Add will add single message to batch.
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 24d564b..d09138c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder(
return bb, nil
}
-// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
func (bc *keyBasedBatchContainer) IsFull() bool {
- return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize)
+ return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
}
func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
return true
}
+// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
- return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+ return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
}
// Add will add single message to key-based batch with message key.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f914017..124c828 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) {
canc()
}
-func TestMaxMessageSize(t *testing.T) {
+func TestMaxBatchSize(t *testing.T) {
+ // Set to be < serverMaxMessageSize
+ batchMaxMessageSize := 512 * 1024
+
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()
+
producer, err := client.CreateProducer(ProducerOptions{
- Topic: newTopicName(),
+ Topic: newTopicName(),
+ BatchingMaxSize: uint(batchMaxMessageSize),
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
+
+ for bias := -1; bias <= 1; bias++ {
+ payload := make([]byte, batchMaxMessageSize+bias)
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: payload,
+ })
+ if bias <= 0 {
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+ } else {
+ assert.Equal(t, errFailAddToBatch, err)
+ }
+ }
+}
+
+func TestMaxMessageSize(t *testing.T) {
serverMaxMessageSize := 1024 * 1024
+
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ // Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge
+ // being masked by an earlier errFailAddToBatch
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ BatchingMaxSize: uint(2 * serverMaxMessageSize),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
for bias := -1; bias <= 1; bias++ {
payload := make([]byte, serverMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{