You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/12/23 22:43:57 UTC
[pulsar-client-go] branch master updated: [Issue 140] Consumer
should not block on received if closed. (#142)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a6231f5 [Issue 140] Consumer should not block on received if closed. (#142)
a6231f5 is described below
commit a6231f5a7c47ef824b0c9f35c4d68243b139106f
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Dec 23 14:43:51 2019 -0800
[Issue 140] Consumer should not block on received if closed. (#142)
* [Issue 140] Consumer should not block on received if closed.
* [Issue #140] Consumer should not block on received if closed.
---
pulsar/consumer_impl.go | 29 ++++++++++++++++++-----------
pulsar/consumer_test.go | 33 +++++++++++++++++++++++++++++++++
2 files changed, 51 insertions(+), 11 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 61c27a7..4a1f85e 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -48,8 +48,9 @@ type consumer struct {
// channel used to deliver message to clients
messageCh chan ConsumerMessage
- closeCh chan struct{}
- errorCh chan error
+ closeOnce sync.Once
+ closeCh chan struct{}
+ errorCh chan error
log *log.Entry
}
@@ -116,6 +117,7 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
consumer := &consumer{
options: options,
messageCh: messageCh,
+ closeCh: make(chan struct{}),
errorCh: make(chan error),
log: log.WithField("topic", topic),
}
@@ -226,6 +228,8 @@ func (c *consumer) Unsubscribe() error {
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
+ case <-c.closeCh:
+ return nil, ErrConsumerClosed
case cm, ok := <-c.messageCh:
if !ok {
return nil, ErrConsumerClosed
@@ -298,15 +302,18 @@ func (c *consumer) NackID(msgID MessageID) {
}
func (c *consumer) Close() {
- var wg sync.WaitGroup
- for i := range c.consumers {
- wg.Add(1)
- go func(pc *partitionConsumer) {
- defer wg.Done()
- pc.Close()
- }(c.consumers[i])
- }
- wg.Wait()
+ c.closeOnce.Do(func() {
+ var wg sync.WaitGroup
+ for i := range c.consumers {
+ wg.Add(1)
+ go func(pc *partitionConsumer) {
+ defer wg.Done()
+ pc.Close()
+ }(c.consumers[i])
+ }
+ wg.Wait()
+ close(c.closeCh)
+ })
}
var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11401ba..9bc2bd6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -794,3 +794,36 @@ func TestConsumerMetadata(t *testing.T) {
assert.Equal(t, v, mv)
}
}
+
+// Test for issue #140
+// Don't block on receive if the consumer has been closed
+func TestConsumerReceiveErrAfterClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ topicName := newTopicName()
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "my-sub",
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ consumer.Close()
+
+ errorCh := make(chan error)
+ go func() {
+ _, err = consumer.Receive(context.Background())
+ errorCh <- err
+ }()
+ select {
+ case <-time.After(200 * time.Millisecond):
+ case err = <-errorCh:
+ }
+ assert.Equal(t, ErrConsumerClosed, err)
+}