You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/03/21 06:06:19 UTC
[pulsar-client-go] branch master updated: [Issue: 201] Expose
GetDeliveryCount method from Message interface (#202)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2f57a6f [Issue: 201] Expose GetDeliveryCount method from Message interface (#202)
2f57a6f is described below
commit 2f57a6f86a0ca076def0347564fed64f640d9c9b
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Sat Mar 21 14:06:10 2020 +0800
[Issue: 201] Expose GetDeliveryCount method from Message interface (#202)
* Expose GetDeliveryCount method from Message interface
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix ci error
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix comments
Signed-off-by: xiaolong.ran <rx...@apache.org>
---
pulsar/consumer_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++
pulsar/dlq_router.go | 3 ++-
pulsar/impl_message.go | 4 +++
pulsar/message.go | 7 ++++++
4 files changed, 78 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3e256f8..488ca59 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1143,3 +1143,68 @@ func TestDLQMultiTopics(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, msg)
}
+
+func TestGetDeliveryCount(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create consumer
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ NackRedeliveryDelay: 1 * time.Second,
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // receive 10 messages and only ack half-of-them
+ for i := 0; i < 10; i++ {
+ msg, _ := consumer.Receive(context.Background())
+
+ if i%2 == 0 {
+ // ack message
+ consumer.Ack(msg)
+ } else {
+ consumer.Nack(msg)
+ }
+ }
+
+ // Receive the unacked messages other 2 times, failing at processing
+ for i := 0; i < 2; i++ {
+ var msg Message
+ for i := 0; i < 5; i++ {
+ msg, err = consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ consumer.Nack(msg)
+ }
+ assert.Equal(t, uint32(i+1), msg.RedeliveryCount())
+ }
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ assert.Equal(t, uint32(3), msg.RedeliveryCount())
+}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 69b45d7..d6e7b30 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -76,7 +76,8 @@ func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool {
// * when we receive the message and redeliveryCount == 10, it means
// that the application has already got (and Nack()) the message 10
// times, so this time we should just go to DLQ.
- return cm.Message.(*message).redeliveryCount >= r.policy.MaxDeliveries
+
+ return msg.redeliveryCount >= r.policy.MaxDeliveries
}
func (r *dlqRouter) Chan() chan ConsumerMessage {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a8033a7..9b85c8a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -176,6 +176,10 @@ func (msg *message) Key() string {
return msg.key
}
+func (msg *message) RedeliveryCount() uint32 {
+ return msg.redeliveryCount
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index bc38ebb..8505321 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -86,6 +86,13 @@ type Message interface {
// Key get the key of the message, if any
Key() string
+
+ // Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
+ // broker will dispatch message again with message redelivery count in CommandMessage defined.
+ //
+ // Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
+ // redelivery count will be recalculated.
+ RedeliveryCount() uint32
}
// MessageID identifier for a particular message