You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/09 01:33:17 UTC

[pulsar-client-go] branch master updated: [Issue 367][producer] Send delay message individually even batching is enabled (#372)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new afc040c  [Issue 367][producer] Send delay message individually even batching is enabled (#372)
afc040c is described below

commit afc040c67d9fade03e5582c4be206e2f538b48db
Author: 程飞 <fa...@gmail.com>
AuthorDate: Fri Oct 9 09:33:08 2020 +0800

    [Issue 367][producer] Send delay message individually even batching is enabled (#372)
    
    Fixes #367
    
    
    ### Motivation
    
    Send delay message individually even batching is enabled.
    
    
    ### Modifications
    
    1. flush batching messages immediately when a new delay message is received
    2. reset deliverAtTime metadata of `BatchBuilder`
---
 pulsar/internal/batch_builder.go |  1 +
 pulsar/producer_partition.go     |  4 ++-
 pulsar/producer_test.go          | 70 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 18ec3c4..fe34f0c 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -153,6 +153,7 @@ func (bb *BatchBuilder) reset() {
 	bb.buffer.Clear()
 	bb.callbacks = []interface{}{}
 	bb.msgMetadata.ReplicateTo = nil
+	bb.msgMetadata.DeliverAtTime = nil
 }
 
 // Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index a16193f..870da2e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -334,6 +334,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
 	}
 
+	if !sendAsBatch {
+		p.internalFlushCurrentBatch()
+	}
 	added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
 		msg.ReplicationClusters, deliverAt)
 	if !added {
@@ -437,7 +440,6 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
-
 	sr := &sendRequest{
 		ctx:              ctx,
 		msg:              msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 20cfd91..2c1ff51 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -23,6 +23,7 @@ import (
 	"net/http"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -64,7 +65,6 @@ func TestProducerNoTopic(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
 	})
-
 	if err != nil {
 		t.Fatal(err)
 		return
@@ -151,13 +151,12 @@ func TestProducerAsyncSend(t *testing.T) {
 }
 
 func TestProducerCompression(t *testing.T) {
-
 	type testProvider struct {
 		name            string
 		compressionType CompressionType
 	}
 
-	var providers = []testProvider{
+	providers := []testProvider{
 		{"zlib", ZLib},
 		{"lz4", LZ4},
 		{"zstd", ZSTD},
@@ -705,6 +704,71 @@ func TestBatchMessageFlushing(t *testing.T) {
 	assert.Equal(t, 2, published, "expected to publish two messages")
 }
 
+// test for issue #367
+func TestBatchDelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	batchingDelay := time.Second
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:                   topic,
+		BatchingMaxPublishDelay: batchingDelay,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "subName",
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	ctx := context.Background()
+	delayMsg := &ProducerMessage{
+		Payload:      []byte("delay: 3s"),
+		DeliverAfter: 3 * time.Second,
+	}
+	var delayMsgID int64
+	ch := make(chan struct{}, 2)
+	producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+		atomic.StoreInt64(&delayMsgID, id.(messageID).entryID)
+		ch <- struct{}{}
+	})
+	delayMsgPublished := false
+	select {
+	case <-ch:
+		delayMsgPublished = true
+	case <-time.After(batchingDelay):
+	}
+	assert.Equal(t, delayMsgPublished, true, "delay message is not published individually when batching is enabled")
+
+	noDelayMsg := &ProducerMessage{
+		Payload: []byte("no delay"),
+	}
+	var noDelayMsgID int64
+	producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) {
+		atomic.StoreInt64(&noDelayMsgID, id.(messageID).entryID)
+	})
+	for i := 0; i < 2; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.Nil(t, err, "unexpected error occurred when recving message from topic")
+
+		switch msg.ID().(trackingMessageID).entryID {
+		case atomic.LoadInt64(&noDelayMsgID):
+			assert.LessOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2))
+		case atomic.LoadInt64(&delayMsgID):
+			assert.GreaterOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3))
+		default:
+			t.Fatalf("got an unexpected message from topic, id:%v", msg.ID().Serialize())
+		}
+	}
+}
+
 func TestDelayRelative(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: serviceURL,