You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/03 12:54:38 UTC
[pulsar] branch master updated: [pulsar-client-go]Add DeliverAfter
option for go client (#6023)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1d2455 [pulsar-client-go]Add DeliverAfter option for go client (#6023)
a1d2455 is described below
commit a1d2455f9d4730cbeda6c9f214668c810772a958
Author: Lijingfeng <17...@qq.com>
AuthorDate: Wed Jun 3 20:54:20 2020 +0800
[pulsar-client-go]Add DeliverAfter option for go client (#6023)
### Motivation
Add delay message feature for go client
### Modifications
I'm inspired by PR 5602, but that PR was not active
1. Add DeliverAfter field to struct ProducerMessage.
2. Add func durationToUnixTimestampMillis because DeliverAfter is a time.Duration.
3. Call C.pulsar_message_set_deliver_after when building message if DeliverAfter is not zero.
4. Add TestProducer_DelayMessage for unit test that covers failover and shared mode as DelayedDeliveryTest.java does.
---
pulsar-client-go/pulsar/c_message.go | 8 ++++
pulsar-client-go/pulsar/message.go | 3 ++
pulsar-client-go/pulsar/producer_test.go | 68 ++++++++++++++++++++++++++++++++
3 files changed, 79 insertions(+)
diff --git a/pulsar-client-go/pulsar/c_message.go b/pulsar-client-go/pulsar/c_message.go
index d62165c..12196cb 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -72,6 +72,10 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {
C.pulsar_message_set_event_timestamp(cMsg, C.uint64_t(timeToUnixTimestampMillis(message.EventTime)))
}
+ if message.DeliverAfter != 0 {
+ C.pulsar_message_set_deliver_after(cMsg, C.uint64_t(durationToUnixTimestampMillis(message.DeliverAfter)))
+ }
+
if message.SequenceID != 0 {
C.pulsar_message_set_sequence_id(cMsg, C.int64_t(message.SequenceID))
}
@@ -223,3 +227,7 @@ func timeToUnixTimestampMillis(t time.Time) C.ulonglong {
millis := nanos / int64(time.Millisecond)
return C.ulonglong(millis)
}
+
+func durationToUnixTimestampMillis(t time.Duration) C.ulonglong {
+ return C.ulonglong(t.Milliseconds())
+}
diff --git a/pulsar-client-go/pulsar/message.go b/pulsar-client-go/pulsar/message.go
index bd2aa38..719884e 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -42,6 +42,9 @@ type ProducerMessage struct {
// Set the sequence id to assign to the current message
SequenceID int64
+
+ // Set the delay duration to deliver the message
+ DeliverAfter time.Duration
}
type Message interface {
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index c8eb08b..3fa2b89 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
assert.NotNil(t, IsNil(msgID))
}
}
+
+func TestProducer_DelayMessage(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "test-send-with-message-id"
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ failoverConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-failover",
+ })
+ assert.Nil(t, err)
+ defer failoverConsumer.Close()
+
+ sharedConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-delay-message-shared",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer sharedConsumer.Close()
+
+ ctx := context.Background()
+
+ delay := time.Second * 5
+ begin := time.Now()
+ t.Logf("begin %v\n", begin)
+ for i := 0; i < 10; i++ {
+ err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ DeliverAfter: delay,
+ })
+ t.Logf("send message %d\n", i)
+ assert.Nil(t, err)
+ }
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ for i := 0; i < 10; i++ {
+ msg, err := failoverConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ t.Logf("message: %s\n", msg.Payload())
+ err = failoverConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ t.Logf("after %v\n", time.Now())
+ assert.True(t, time.Since(begin) < delay)
+ }
+
+ for i := 0; i < 10; i++ {
+ msg, err := sharedConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ t.Logf("message: %s\n", msg.Payload())
+ err = sharedConsumer.Ack(msg)
+ assert.Nil(t, err)
+
+ t.Logf("after %v\n", time.Now())
+ assert.True(t, time.Since(begin) > delay)
+ }
+}