You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/14 17:05:11 UTC
[pulsar] branch master updated: Negative acks implementation for Go
client (#3817)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c83631b Negative acks implementation for Go client (#3817)
c83631b is described below
commit c83631bf3dc330e3561c8da2764d249d3a63711c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Mar 14 10:05:05 2019 -0700
Negative acks implementation for Go client (#3817)
---
pulsar-client-go/pulsar/c_consumer.go | 15 +++++++
pulsar-client-go/pulsar/consumer.go | 22 +++++++++++
pulsar-client-go/pulsar/consumer_test.go | 67 ++++++++++++++++++++++++++++++++
3 files changed, 104 insertions(+)
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 9ca73e8..97fa843 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -96,6 +96,11 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_unacked_messages_timeout_ms(conf, C.uint64_t(timeoutMillis))
}
+ if options.NackRedeliveryDelay != nil {
+ delayMillis := options.NackRedeliveryDelay.Nanoseconds() / int64(time.Millisecond)
+ C.pulsar_configure_set_negative_ack_redelivery_delay_ms(conf, C.long(delayMillis))
+ }
+
if options.Type != Exclusive {
C.pulsar_consumer_configuration_set_consumer_type(conf, C.pulsar_consumer_type(options.Type))
}
@@ -254,6 +259,16 @@ func (c *consumer) AckCumulativeID(msgId MessageID) error {
return nil
}
+func (c *consumer) Nack(msg Message) error {
+ C.pulsar_consumer_negative_acknowledge(c.ptr, msg.(*message).ptr)
+ return nil
+}
+
+func (c *consumer) NackID(msgId MessageID) error {
+ C.pulsar_consumer_negative_acknowledge_id(c.ptr, msgId.(*messageID).ptr)
+ return nil
+}
+
func (c *consumer) Close() error {
channel := make(chan error)
c.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index d7549c4..ae6b498 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -83,6 +83,10 @@ type ConsumerOptions struct {
// Default is 0, which means message are not being replayed based on ack time
AckTimeout time.Duration
+ // The delay after which to redeliver the messages that failed to be
+ // processed. Default is 1min. (See `Consumer.Nack()`)
+ NackRedeliveryDelay *time.Duration
+
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
@@ -161,6 +165,24 @@ type Consumer interface {
// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
AckCumulativeID(MessageID) error
+ // Acknowledge the failure to process a single message.
+ //
+ // When a message is "negatively acked" it will be marked for redelivery after
+ // some fixed delay. The delay is configurable when constructing the consumer
+ // with ConsumerOptions.NAckRedeliveryDelay .
+ //
+ // This call is not blocking.
+ Nack(Message) error
+
+ // Acknowledge the failure to process a single message.
+ //
+ // When a message is "negatively acked" it will be marked for redelivery after
+ // some fixed delay. The delay is configurable when constructing the consumer
+ // with ConsumerOptions.NackRedeliveryDelay .
+ //
+ // This call is not blocking.
+ NackID(MessageID) error
+
// Close the consumer and stop the broker to push more messages
Close() error
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 9c034ba..f217161 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -514,3 +514,70 @@ func TestConsumer_SubscriptionInitPos(t *testing.T) {
assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}
+
+func TestConsumerNegativeAcks(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := "TestConsumerNegativeAcks"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ nackDelay := 100 * time.Millisecond
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ NackRedeliveryDelay: &nackDelay,
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ producer.SendAsync(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }, func(producerMessage ProducerMessage, e error) {
+ fmt.Print("send complete. err=", e)
+ })
+ }
+
+ producer.Flush()
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+
+ assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+
+ // Ack with error
+ consumer.Nack(msg)
+ }
+
+ // Messages will be redelivered
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+
+ assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+
+ // This time acks successfully
+ consumer.Ack(msg)
+ }
+
+
+ consumer.Unsubscribe()
+}