You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/12/23 22:43:57 UTC

[pulsar-client-go] branch master updated: [Issue 140] Consumer should not block on received if closed. (#142)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a6231f5  [Issue 140] Consumer should not block on received if closed. (#142)
a6231f5 is described below

commit a6231f5a7c47ef824b0c9f35c4d68243b139106f
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Dec 23 14:43:51 2019 -0800

    [Issue 140] Consumer should not block on received if closed. (#142)
    
    * [Issue 140] Consumer should not block on received if closed.
    
    * [Issue #140] Consumer should not block on received if closed.
---
 pulsar/consumer_impl.go | 29 ++++++++++++++++++-----------
 pulsar/consumer_test.go | 33 +++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 11 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 61c27a7..4a1f85e 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -48,8 +48,9 @@ type consumer struct {
 	// channel used to deliver message to clients
 	messageCh chan ConsumerMessage
 
-	closeCh chan struct{}
-	errorCh chan error
+	closeOnce sync.Once
+	closeCh   chan struct{}
+	errorCh   chan error
 
 	log *log.Entry
 }
@@ -116,6 +117,7 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
 	consumer := &consumer{
 		options:   options,
 		messageCh: messageCh,
+		closeCh:   make(chan struct{}),
 		errorCh:   make(chan error),
 		log:       log.WithField("topic", topic),
 	}
@@ -226,6 +228,8 @@ func (c *consumer) Unsubscribe() error {
 func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
 	for {
 		select {
+		case <-c.closeCh:
+			return nil, ErrConsumerClosed
 		case cm, ok := <-c.messageCh:
 			if !ok {
 				return nil, ErrConsumerClosed
@@ -298,15 +302,18 @@ func (c *consumer) NackID(msgID MessageID) {
 }
 
 func (c *consumer) Close() {
-	var wg sync.WaitGroup
-	for i := range c.consumers {
-		wg.Add(1)
-		go func(pc *partitionConsumer) {
-			defer wg.Done()
-			pc.Close()
-		}(c.consumers[i])
-	}
-	wg.Wait()
+	c.closeOnce.Do(func() {
+		var wg sync.WaitGroup
+		for i := range c.consumers {
+			wg.Add(1)
+			go func(pc *partitionConsumer) {
+				defer wg.Done()
+				pc.Close()
+			}(c.consumers[i])
+		}
+		wg.Wait()
+		close(c.closeCh)
+	})
 }
 
 var r = &random{
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11401ba..9bc2bd6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -794,3 +794,36 @@ func TestConsumerMetadata(t *testing.T) {
 		assert.Equal(t, v, mv)
 	}
 }
+
+// Test for issue #140
+// Don't block on receive if the consumer has been closed
+func TestConsumerReceiveErrAfterClose(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	topicName := newTopicName()
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "my-sub",
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	consumer.Close()
+
+	errorCh := make(chan error)
+	go func() {
+		_, err = consumer.Receive(context.Background())
+		errorCh <- err
+	}()
+	select {
+	case <-time.After(200 * time.Millisecond):
+	case err = <-errorCh:
+	}
+	assert.Equal(t, ErrConsumerClosed, err)
+}