You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/02 03:34:21 UTC

[pulsar-client-go] branch master updated: [Issue 453] Add NewMessageID() method. (#893)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 48c39ee  [Issue 453] Add NewMessageID() method. (#893)
48c39ee is described below

commit 48c39ee22c84184713af513a2bb68d2516eecf2f
Author: crossoverJie <cr...@gmail.com>
AuthorDate: Fri Dec 2 11:34:15 2022 +0800

    [Issue 453] Add NewMessageID() method. (#893)
    
    * fix #453
    
    * rename newID
---
 pulsar/consumer_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
 pulsar/message.go       |  5 +++++
 2 files changed, 48 insertions(+)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e9b3013..989b906 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3303,3 +3303,46 @@ func TestConsumerWithBackoffPolicy(t *testing.T) {
 	partitionConsumerImp.reconnectToBroker()
 	assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
 }
+
+func TestAckWithMessageID(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub",
+		Type:             Exclusive,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: false,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send messages
+	if _, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: []byte("hello"),
+	}); err != nil {
+		log.Fatal(err)
+	}
+
+	message, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+
+	id := message.ID()
+	newID := NewMessageID(id.LedgerID(), id.EntryID(), id.BatchIdx(), id.PartitionIdx())
+	err = consumer.AckID(newID)
+	assert.Nil(t, err)
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index c117c99..7f2e07f 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -161,6 +161,11 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
 	return deserializeMessageID(data)
 }
 
+// NewMessageID Custom Create MessageID
+func NewMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID {
+	return newMessageID(ledgerID, entryID, batchIdx, partitionIdx)
+}
+
 // EarliestMessageID returns a messageID that points to the earliest message available in a topic
 func EarliestMessageID() MessageID {
 	return earliestMessageID