You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/19 04:14:42 UTC
[pulsar-client-go] branch master updated: fix panic when creating
consumer with ReceiverQueueSize set to -1 (#289)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 826df81 fix panic when creating consumer with ReceiverQueueSize set to -1 (#289)
826df81 is described below
commit 826df8194b9cd64b5be2624dbf8c4abd97f53ce6
Author: Shohi Wang <os...@gmail.com>
AuthorDate: Fri Jun 19 12:14:13 2020 +0800
fix panic when creating consumer with ReceiverQueueSize set to -1 (#289)
* fix panic when creating consumer with ReceiverQueueSize set to -1
* update comment
* default queue size is applied if ReceiverQueueSize is not positive
---
pulsar/consumer.go | 1 -
pulsar/consumer_impl.go | 2 +-
pulsar/consumer_test.go | 25 +++++++++++++++++++++++++
3 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index a14b41c..97020b6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -116,7 +116,6 @@ type ConsumerOptions struct {
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
- // Set to -1 to disable prefetching in consumer
ReceiverQueueSize int
// The delay after which to redeliver the messages that failed to be
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 3a27def..a3fed32 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -70,7 +70,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
}
- if options.ReceiverQueueSize == 0 {
+ if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = 1000
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6007c45..3a06cd1 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1274,3 +1274,28 @@ func TestConsumerAddTopicPartitions(t *testing.T) {
assert.Equal(t, len(msgs), 10)
}
+
+func TestConsumterNegativeRecieverQueueSize(t *testing.T) {
+ assert := assert.New(t)
+
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(err)
+ defer client.Close()
+
+ topic := newTopicName()
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ ReceiverQueueSize: -1,
+ })
+ defer func() {
+ if consumer != nil {
+ consumer.Close()
+ }
+ }()
+
+ assert.Nil(err)
+}