You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/14 17:05:11 UTC

[pulsar] branch master updated: Negative acks implementation for Go client (#3817)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c83631b  Negative acks implementation for Go client (#3817)
c83631b is described below

commit c83631bf3dc330e3561c8da2764d249d3a63711c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 14 10:05:05 2019 -0700

    Negative acks implementation for Go client (#3817)
---
 pulsar-client-go/pulsar/c_consumer.go    | 15 +++++++
 pulsar-client-go/pulsar/consumer.go      | 22 +++++++++++
 pulsar-client-go/pulsar/consumer_test.go | 67 ++++++++++++++++++++++++++++++++
 3 files changed, 104 insertions(+)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 9ca73e8..97fa843 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -96,6 +96,11 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_unacked_messages_timeout_ms(conf, C.uint64_t(timeoutMillis))
 	}
 
+	if options.NackRedeliveryDelay != nil {
+		delayMillis := options.NackRedeliveryDelay.Nanoseconds() / int64(time.Millisecond)
+		C.pulsar_configure_set_negative_ack_redelivery_delay_ms(conf, C.long(delayMillis))
+	}
+
 	if options.Type != Exclusive {
 		C.pulsar_consumer_configuration_set_consumer_type(conf, C.pulsar_consumer_type(options.Type))
 	}
@@ -254,6 +259,16 @@ func (c *consumer) AckCumulativeID(msgId MessageID) error {
 	return nil
 }
 
+func (c *consumer) Nack(msg Message) error {
+	C.pulsar_consumer_negative_acknowledge(c.ptr, msg.(*message).ptr)
+	return nil
+}
+
+func (c *consumer) NackID(msgId MessageID) error {
+	C.pulsar_consumer_negative_acknowledge_id(c.ptr, msgId.(*messageID).ptr)
+	return nil
+}
+
 func (c *consumer) Close() error {
 	channel := make(chan error)
 	c.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index d7549c4..ae6b498 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -83,6 +83,10 @@ type ConsumerOptions struct {
 	// Default is 0, which means message are not being replayed based on ack time
 	AckTimeout time.Duration
 
+	// The delay after which to redeliver the messages that failed to be
+	// processed. Default is 1min. (See `Consumer.Nack()`)
+	NackRedeliveryDelay *time.Duration
+
 	// Select the subscription type to be used when subscribing to the topic.
 	// Default is `Exclusive`
 	Type SubscriptionType
@@ -161,6 +165,24 @@ type Consumer interface {
 	// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
 	AckCumulativeID(MessageID) error
 
+	// Acknowledge the failure to process a single message.
+	//
+	// When a message is "negatively acked" it will be marked for redelivery after
+	// some fixed delay. The delay is configurable when constructing the consumer
+	// with ConsumerOptions.NAckRedeliveryDelay .
+	//
+	// This call is not blocking.
+	Nack(Message) error
+
+	// Acknowledge the failure to process a single message.
+	//
+	// When a message is "negatively acked" it will be marked for redelivery after
+	// some fixed delay. The delay is configurable when constructing the consumer
+	// with ConsumerOptions.NackRedeliveryDelay .
+	//
+	// This call is not blocking.
+	NackID(MessageID) error
+
 	// Close the consumer and stop the broker to push more messages
 	Close() error
 
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 9c034ba..f217161 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -514,3 +514,70 @@ func TestConsumer_SubscriptionInitPos(t *testing.T) {
 
 	assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
 }
+
+func TestConsumerNegativeAcks(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "TestConsumerNegativeAcks"
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	nackDelay := 100 * time.Millisecond
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:               topic,
+		SubscriptionName:    "my-sub",
+		NackRedeliveryDelay: &nackDelay,
+	})
+
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		producer.SendAsync(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}, func(producerMessage ProducerMessage, e error) {
+			fmt.Print("send complete. err=", e)
+		})
+	}
+
+	producer.Flush()
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
+
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+
+		// Ack with error
+		consumer.Nack(msg)
+	}
+
+	// Messages will be redelivered
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.NotNil(t, msg)
+
+		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+
+		// This time acks successfully
+		consumer.Ack(msg)
+	}
+
+
+	consumer.Unsubscribe()
+}