You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/03/21 06:06:19 UTC

[pulsar-client-go] branch master updated: [Issue: 201] Expose GetDeliveryCount method from Message interface (#202)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f57a6f  [Issue: 201] Expose GetDeliveryCount method from Message interface (#202)
2f57a6f is described below

commit 2f57a6f86a0ca076def0347564fed64f640d9c9b
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Sat Mar 21 14:06:10 2020 +0800

    [Issue: 201] Expose GetDeliveryCount method from Message interface (#202)
    
    * Expose GetDeliveryCount method from Message interface
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 pulsar/consumer_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++
 pulsar/dlq_router.go    |  3 ++-
 pulsar/impl_message.go  |  4 +++
 pulsar/message.go       |  7 ++++++
 4 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 3e256f8..488ca59 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1143,3 +1143,68 @@ func TestDLQMultiTopics(t *testing.T) {
 	assert.Error(t, err)
 	assert.Nil(t, msg)
 }
+
+func TestGetDeliveryCount(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create consumer
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		SubscriptionName:    "my-sub",
+		NackRedeliveryDelay: 1 * time.Second,
+		Type:                Shared,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		if _, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	// receive 10 messages and only ack half-of-them
+	for i := 0; i < 10; i++ {
+		msg, _ := consumer.Receive(context.Background())
+
+		if i%2 == 0 {
+			// ack message
+			consumer.Ack(msg)
+		} else {
+			consumer.Nack(msg)
+		}
+	}
+
+	// Receive the unacked messages other 2 times, failing at processing
+	for i := 0; i < 2; i++ {
+		var msg Message
+		for i := 0; i < 5; i++ {
+			msg, err = consumer.Receive(context.Background())
+			assert.Nil(t, err)
+			consumer.Nack(msg)
+		}
+		assert.Equal(t, uint32(i+1), msg.RedeliveryCount())
+	}
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	assert.Equal(t, uint32(3), msg.RedeliveryCount())
+}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 69b45d7..d6e7b30 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -76,7 +76,8 @@ func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool {
 	//  * when we receive the message and redeliveryCount == 10, it means
 	//    that the application has already got (and Nack())  the message 10
 	//    times, so this time we should just go to DLQ.
-	return cm.Message.(*message).redeliveryCount >= r.policy.MaxDeliveries
+
+	return msg.redeliveryCount >= r.policy.MaxDeliveries
 }
 
 func (r *dlqRouter) Chan() chan ConsumerMessage {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a8033a7..9b85c8a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -176,6 +176,10 @@ func (msg *message) Key() string {
 	return msg.key
 }
 
+func (msg *message) RedeliveryCount() uint32 {
+	return msg.redeliveryCount
+}
+
 func newAckTracker(size int) *ackTracker {
 	var batchIDs *big.Int
 	if size <= 64 {
diff --git a/pulsar/message.go b/pulsar/message.go
index bc38ebb..8505321 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -86,6 +86,13 @@ type Message interface {
 
 	// Key get the key of the message, if any
 	Key() string
+
+	// Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
+	// broker will dispatch message again with message redelivery count in CommandMessage defined.
+	//
+	// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
+	// redelivery count will be recalculated.
+	RedeliveryCount() uint32
 }
 
 // MessageID identifier for a particular message