You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/03/18 16:40:05 UTC

[pulsar-client-go] branch master updated: Fix consumer not found (#196)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ba30600  Fix consumer not found (#196)
ba30600 is described below

commit ba306000b0dc0f9b9b983e6f5937b5abc950f05a
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Thu Mar 19 00:39:56 2020 +0800

    Fix consumer not found (#196)
    
    * Fix consumer not found
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 .github/workflows/go.yml     |  2 +-
 pulsar/consumer_partition.go | 23 ++++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 81745e9..8682423 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -14,7 +14,7 @@ jobs:
       id: go
 
     - name: Check out code into the Go module directory
-      uses: actions/checkout@v1
+      uses: actions/checkout@v2
 
     - name: Test
       run: |
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e624d82..410e1ad 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -168,6 +168,12 @@ func (pc *partitionConsumer) Unsubscribe() error {
 func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
 	defer close(unsub.doneCh)
 
+	if pc.state == consumerClosed || pc.state == consumerClosing {
+		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+		return
+	}
+
+	pc.state = consumerClosing
 	requestID := pc.client.rpcClient.NewRequestID()
 	cmdUnsubscribe := &pb.CommandUnsubscribe{
 		RequestId:  proto.Uint64(requestID),
@@ -180,6 +186,11 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
 	}
 
 	pc.conn.DeleteConsumeHandler(pc.consumerID)
+	if pc.nackTracker != nil {
+		pc.nackTracker.Close()
+	}
+	pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
+	pc.state = consumerClosed
 }
 
 func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
@@ -648,6 +659,14 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 		return
 	}
 
+	if pc.state == consumerClosed || pc.state == consumerClosing {
+		pc.log.Error("The consumer is closing or has been closed")
+		if pc.nackTracker != nil {
+			pc.nackTracker.Close()
+		}
+		return
+	}
+
 	pc.state = consumerClosing
 	pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
@@ -665,7 +684,9 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 
 	pc.state = consumerClosed
 	pc.conn.DeleteConsumeHandler(pc.consumerID)
-	pc.nackTracker.Close()
+	if pc.nackTracker != nil {
+		pc.nackTracker.Close()
+	}
 	close(pc.closeCh)
 }