You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/09 01:33:17 UTC
[pulsar-client-go] branch master updated: [Issue 367][producer]
Send delay message individually even batching is enabled (#372)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new afc040c [Issue 367][producer] Send delay message individually even batching is enabled (#372)
afc040c is described below
commit afc040c67d9fade03e5582c4be206e2f538b48db
Author: 程飞 <fa...@gmail.com>
AuthorDate: Fri Oct 9 09:33:08 2020 +0800
[Issue 367][producer] Send delay message individually even batching is enabled (#372)
Fixes #367
### Motivation
Send delay message individually even batching is enabled.
### Modifications
1. flush batching messages immediately when a new delay message is received
2. reset deliverAtTime metadata of `BatchBuilder`
---
pulsar/internal/batch_builder.go | 1 +
pulsar/producer_partition.go | 4 ++-
pulsar/producer_test.go | 70 ++++++++++++++++++++++++++++++++++++++--
3 files changed, 71 insertions(+), 4 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 18ec3c4..fe34f0c 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -153,6 +153,7 @@ func (bb *BatchBuilder) reset() {
bb.buffer.Clear()
bb.callbacks = []interface{}{}
bb.msgMetadata.ReplicateTo = nil
+ bb.msgMetadata.DeliverAtTime = nil
}
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index a16193f..870da2e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -334,6 +334,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}
+ if !sendAsBatch {
+ p.internalFlushCurrentBatch()
+ }
added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
msg.ReplicationClusters, deliverAt)
if !added {
@@ -437,7 +440,6 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-
sr := &sendRequest{
ctx: ctx,
msg: msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 20cfd91..2c1ff51 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -23,6 +23,7 @@ import (
"net/http"
"strconv"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -64,7 +65,6 @@ func TestProducerNoTopic(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
-
if err != nil {
t.Fatal(err)
return
@@ -151,13 +151,12 @@ func TestProducerAsyncSend(t *testing.T) {
}
func TestProducerCompression(t *testing.T) {
-
type testProvider struct {
name string
compressionType CompressionType
}
- var providers = []testProvider{
+ providers := []testProvider{
{"zlib", ZLib},
{"lz4", LZ4},
{"zstd", ZSTD},
@@ -705,6 +704,71 @@ func TestBatchMessageFlushing(t *testing.T) {
assert.Equal(t, 2, published, "expected to publish two messages")
}
+// test for issue #367
+func TestBatchDelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ batchingDelay := time.Second
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ BatchingMaxPublishDelay: batchingDelay,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "subName",
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ctx := context.Background()
+ delayMsg := &ProducerMessage{
+ Payload: []byte("delay: 3s"),
+ DeliverAfter: 3 * time.Second,
+ }
+ var delayMsgID int64
+ ch := make(chan struct{}, 2)
+ producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ atomic.StoreInt64(&delayMsgID, id.(messageID).entryID)
+ ch <- struct{}{}
+ })
+ delayMsgPublished := false
+ select {
+ case <-ch:
+ delayMsgPublished = true
+ case <-time.After(batchingDelay):
+ }
+ assert.Equal(t, delayMsgPublished, true, "delay message is not published individually when batching is enabled")
+
+ noDelayMsg := &ProducerMessage{
+ Payload: []byte("no delay"),
+ }
+ var noDelayMsgID int64
+ producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+ atomic.StoreInt64(&noDelayMsgID, id.(messageID).entryID)
+ })
+ for i := 0; i < 2; i++ {
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err, "unexpected error occurred when recving message from topic")
+
+ switch msg.ID().(trackingMessageID).entryID {
+ case atomic.LoadInt64(&noDelayMsgID):
+ assert.LessOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2))
+ case atomic.LoadInt64(&delayMsgID):
+ assert.GreaterOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3))
+ default:
+ t.Fatalf("got an unexpected message from topic, id:%v", msg.ID().Serialize())
+ }
+ }
+}
+
func TestDelayRelative(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,