You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/02 03:34:21 UTC
[pulsar-client-go] branch master updated: [Issue 453] Add NewMessageID() method. (#893)
This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 48c39ee [Issue 453] Add NewMessageID() method. (#893)
48c39ee is described below
commit 48c39ee22c84184713af513a2bb68d2516eecf2f
Author: crossoverJie <cr...@gmail.com>
AuthorDate: Fri Dec 2 11:34:15 2022 +0800
[Issue 453] Add NewMessageID() method. (#893)
* fix #453
* rename newID
---
pulsar/consumer_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
pulsar/message.go | 5 +++++
2 files changed, 48 insertions(+)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e9b3013..989b906 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3303,3 +3303,46 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
partitionConsumerImp.reconnectToBroker()
assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
}
+
+func TestAckWithMessageID(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send messages
+ if _, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ }); err != nil {
+ log.Fatal(err)
+ }
+
+ message, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+
+ id := message.ID()
+ newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx())
+ err = consumer.AckID(newID)
+ assert.Nil(t, err)
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index c117c99..7f2e07f 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -161,6 +161,11 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
return deserializeMessageID(data)
}
+// NewMessageID Custom Create MessageID
+func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID {
+ return newMessageID(ledgerID, entryID, batchIdx, partitionIdx)
+}
+
// EarliestMessageID returns a messageID that points to the earliest message available in a topic
func EarliestMessageID() MessageID {
return earliestMessageID