You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 12:54:38 UTC

[pulsar] branch master updated: [pulsar-client-go]Add DeliverAfter option for go client (#6023)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1d2455  [pulsar-client-go]Add DeliverAfter option for go client (#6023)
a1d2455 is described below

commit a1d2455f9d4730cbeda6c9f214668c810772a958
Author: Lijingfeng <17...@qq.com>
AuthorDate: Wed Jun 3 20:54:20 2020 +0800

    [pulsar-client-go]Add DeliverAfter option for go client (#6023)
    
    ### Motivation
    
    Add delay message feature for go client
    
    ### Modifications
    
    I'm inspired by PR 5602, but that PR was not active
    
    1. Add DeliverAfter field to struct ProducerMessage.
    2. Add func durationToUnixTimestampMillis because DeliverAfter is a time.Duration.
    3. Call C.pulsar_message_set_deliver_after when building message if DeliverAfter is not zero.
    4. Add TestProducer_DelayMessage for unit test that covers failover and shared mode as DelayedDeliveryTest.java does.
---
 pulsar-client-go/pulsar/c_message.go     |  8 ++++
 pulsar-client-go/pulsar/message.go       |  3 ++
 pulsar-client-go/pulsar/producer_test.go | 68 ++++++++++++++++++++++++++++++++
 3 files changed, 79 insertions(+)

diff --git a/pulsar-client-go/pulsar/c_message.go b/pulsar-client-go/pulsar/c_message.go
index d62165c..12196cb 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -72,6 +72,10 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {
 		C.pulsar_message_set_event_timestamp(cMsg, C.uint64_t(timeToUnixTimestampMillis(message.EventTime)))
 	}
 
+	if message.DeliverAfter != 0 {
+		C.pulsar_message_set_deliver_after(cMsg, C.uint64_t(durationToUnixTimestampMillis(message.DeliverAfter)))
+	}
+
 	if message.SequenceID != 0 {
 		C.pulsar_message_set_sequence_id(cMsg, C.int64_t(message.SequenceID))
 	}
@@ -223,3 +227,7 @@ func timeToUnixTimestampMillis(t time.Time) C.ulonglong {
 	millis := nanos / int64(time.Millisecond)
 	return C.ulonglong(millis)
 }
+
+func durationToUnixTimestampMillis(t time.Duration) C.ulonglong {
+	return C.ulonglong(t.Milliseconds())
+}
diff --git a/pulsar-client-go/pulsar/message.go b/pulsar-client-go/pulsar/message.go
index bd2aa38..719884e 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -42,6 +42,9 @@ type ProducerMessage struct {
 
 	// Set the sequence id to assign to the current message
 	SequenceID int64
+
+	// Set the delay duration to deliver the message
+	DeliverAfter time.Duration
 }
 
 type Message interface {
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index c8eb08b..3fa2b89 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second * 5
+	begin := time.Now()
+	t.Logf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload:      []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		t.Logf("send message %d\n", i)
+		assert.Nil(t, err)
+	}
+
+	// Failover consumer will receive the messages immediately while
+	// the shared consumer will get them after the delay
+	for i := 0; i < 10; i++ {
+		msg, err := failoverConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		t.Logf("message: %s\n", msg.Payload())
+		err = failoverConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		t.Logf("after %v\n", time.Now())
+		assert.True(t, time.Since(begin) < delay)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := sharedConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		t.Logf("message: %s\n", msg.Payload())
+		err = sharedConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		t.Logf("after %v\n", time.Now())
+		assert.True(t, time.Since(begin) > delay)
+	}
+}