You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rf...@apache.org on 2022/06/23 00:51:20 UTC
[pulsar-client-go] branch master updated: [issue 791] allow to add at least one message on batch builder (#792)
This is an automated email from the ASF dual-hosted git repository.
rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c558ddf [issue 791] allow to add at least one message on batch builder (#792)
c558ddf is described below
commit c558ddfa10e04b88d33bc6daacc9aa067d9f28dc
Author: ming <it...@gmail.com>
AuthorDate: Wed Jun 22 20:51:14 2022 -0400
[issue 791] allow to add at least one message on batch builder (#792)
* allow to add at least one message on batch builder
* add unit test for batching disabled
---
pulsar/internal/batch_builder.go | 4 ++++
pulsar/internal/key_based_batch_builder.go | 5 ++++
pulsar/producer_test.go | 38 ++++++++++++++++++++++++------
3 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 9d18f26..fb7598e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,6 +157,10 @@ func (bc *batchContainer) IsFull() bool {
// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
func (bc *batchContainer) hasSpace(payload []byte) bool {
+ if bc.numMessages == 0 {
+ // allow to add at least one message when batching is disabled
+ return true
+ }
msgSize := uint32(len(payload))
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
}
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index d09138c..667e855 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,6 +117,11 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
+ if bc.numMessages == 0 {
+ // allow to add at least one message
+ // and a single max message size is checked in the producer partition, therefore no need to validate batch size
+ return true
+ }
msgSize := uint32(len(payload))
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 541c1fe..6b2b5d9 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -877,20 +877,44 @@ func TestMaxBatchSize(t *testing.T) {
assert.NotNil(t, producer)
defer producer.Close()
- for bias := -1; bias <= 1; bias++ {
+ for bias := -1; bias <= 3; bias++ {
payload := make([]byte, batchMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: payload,
})
- if bias <= 0 {
- assert.NoError(t, err)
- assert.NotNil(t, ID)
- } else {
- assert.Equal(t, errFailAddToBatch, err)
- }
+ // regardless max batch size, if the batch size limit is reached, batching is triggered to send messages
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
}
}
+func TestBatchingDisabled(t *testing.T) {
+ defaultMaxMessageSize := 128 * 1024
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ // when batching is disabled, the batching size has no effect
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ DisableBatching: true,
+ BatchingMaxSize: uint(defaultMaxBatchSize),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+ defer producer.Close()
+
+ payload := make([]byte, defaultMaxMessageSize+100)
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: payload,
+ })
+ // regardless max batch size, if the batch size limit is reached, batching is triggered to send messages
+ assert.NoError(t, err)
+ assert.NotNil(t, ID)
+}
+
func TestMaxMessageSize(t *testing.T) {
serverMaxMessageSize := 1024 * 1024