You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/02/09 02:43:55 UTC

[pulsar-client-go] branch master updated: Fix unsubscribe blocked when consumer is closing or has closed (#457)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 25f3075  Fix unsubscribe blocked when consumer is closing or has closed (#457)
25f3075 is described below

commit 25f3075176f853f5f09d89b3b625e0646432735f
Author: wuYin <wu...@gmail.com>
AuthorDate: Tue Feb 9 10:43:50 2021 +0800

    Fix unsubscribe blocked when consumer is closing or has closed (#457)
    
    ### Motivation
    
    For the present consumer, `Close()` and `Unsubscribe()` handled by the same eventloop goroutine.
    The eventloop exited after `Close()`, then unsubscribe event wouldn't be selected and handled anymore, lead to block.
    
    example:
    ```go
    func main() {
    	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer client.Close()
    
    	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    		Topic:            "topic-1",
    		SubscriptionName: "my-sub",
    	})
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	defer consumer.Unsubscribe() // unintentional
    	defer consumer.Close()
    }
    ```
    
    `Unsubscribe()` blocked:
    
    ![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png)
    
    
    
    
    ### Modifications
    
    Check consumer state before send unsubscribe event, if consumer is closing or has closed, just logging it
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
---
 pulsar/consumer_partition.go | 31 +++++++++++++++++++++++++------
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b9c9b12..b75a7d7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,21 @@ const (
 	consumerClosed
 )
 
+func (s consumerState) String() string {
+	switch s {
+	case consumerInit:
+		return "Initializing"
+	case consumerReady:
+		return "Ready"
+	case consumerClosing:
+		return "Closing"
+	case consumerClosed:
+		return "Closed"
+	default:
+		return "Unknown"
+	}
+}
+
 type subscriptionMode int
 
 const (
@@ -195,6 +210,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
 }
 
 func (pc *partitionConsumer) Unsubscribe() error {
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
+		return nil
+	}
+
 	req := &unsubscribeRequest{doneCh: make(chan struct{})}
 	pc.eventsCh <- req
 
@@ -206,9 +226,8 @@ func (pc *partitionConsumer) Unsubscribe() error {
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
 	defer close(unsub.doneCh)
 
-	state := pc.getConsumerState()
-	if state == consumerClosed || state == consumerClosing {
-		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+	if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+		pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
 		return
 	}
 
@@ -354,7 +373,7 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
 func (pc *partitionConsumer) requestSeek(msgID messageID) error {
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.Error("Consumer was already closed")
+		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
 		return nil
 	}
 
@@ -398,7 +417,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
 
 	state := pc.getConsumerState()
 	if state == consumerClosing || state == consumerClosed {
-		pc.log.Error("Consumer was already closed")
+		pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
 		return
 	}
 
@@ -798,7 +817,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 	}
 
 	if state == consumerClosed || state == consumerClosing {
-		pc.log.Error("The consumer is closing or has been closed")
+		pc.log.WithField("state", state).Error("Consumer is closing or has closed")
 		if pc.nackTracker != nil {
 			pc.nackTracker.Close()
 		}