You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/19 04:14:42 UTC

[pulsar-client-go] branch master updated: fix panic when creating consumer with ReceiverQueueSize set to -1 (#289)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 826df81  fix panic when creating consumer with ReceiverQueueSize set to -1 (#289)
826df81 is described below

commit 826df8194b9cd64b5be2624dbf8c4abd97f53ce6
Author: Shohi Wang <os...@gmail.com>
AuthorDate: Fri Jun 19 12:14:13 2020 +0800

    fix panic when creating consumer with ReceiverQueueSize set to -1 (#289)
    
    * fix panic when creating consumer with ReceiverQueueSize set to -1
    
    * update comment
    
    * default queue size is applied if ReceiverQueueSize is not positive
---
 pulsar/consumer.go      |  1 -
 pulsar/consumer_impl.go |  2 +-
 pulsar/consumer_test.go | 25 +++++++++++++++++++++++++
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index a14b41c..97020b6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -116,7 +116,6 @@ type ConsumerOptions struct {
 	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
 	// throughput at the expense of bigger memory utilization.
 	// Default value is `1000` messages and should be good for most use cases.
-	// Set to -1 to disable prefetching in consumer
 	ReceiverQueueSize int
 
 	// The delay after which to redeliver the messages that failed to be
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 3a27def..a3fed32 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -70,7 +70,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
 	}
 
-	if options.ReceiverQueueSize == 0 {
+	if options.ReceiverQueueSize <= 0 {
 		options.ReceiverQueueSize = 1000
 	}
 
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6007c45..3a06cd1 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1274,3 +1274,28 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
 
 	assert.Equal(t, len(msgs), 10)
 }
+
+func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
+	assert := assert.New(t)
+
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(err)
+	defer client.Close()
+
+	topic := newTopicName()
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:             topic,
+		SubscriptionName:  "my-sub",
+		ReceiverQueueSize: -1,
+	})
+	defer func() {
+		if consumer != nil {
+			consumer.Close()
+		}
+	}()
+
+	assert.Nil(err)
+}