You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/03/18 16:40:05 UTC
[pulsar-client-go] branch master updated: Fix consumer not found
(#196)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ba30600 Fix consumer not found (#196)
ba30600 is described below
commit ba306000b0dc0f9b9b983e6f5937b5abc950f05a
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Thu Mar 19 00:39:56 2020 +0800
Fix consumer not found (#196)
* Fix consumer not found
Signed-off-by: xiaolong.ran <rx...@apache.org>
* fix ci error
Signed-off-by: xiaolong.ran <rx...@apache.org>
---
.github/workflows/go.yml | 2 +-
pulsar/consumer_partition.go | 23 ++++++++++++++++++++++-
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index 81745e9..8682423 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -14,7 +14,7 @@ jobs:
id: go
- name: Check out code into the Go module directory
- uses: actions/checkout@v1
+ uses: actions/checkout@v2
- name: Test
run: |
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e624d82..410e1ad 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -168,6 +168,12 @@ func (pc *partitionConsumer) Unsubscribe() error {
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
+ if pc.state == consumerClosed || pc.state == consumerClosing {
+ pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+ return
+ }
+
+ pc.state = consumerClosing
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
@@ -180,6 +186,11 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
}
pc.conn.DeleteConsumeHandler(pc.consumerID)
+ if pc.nackTracker != nil {
+ pc.nackTracker.Close()
+ }
+ pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
+ pc.state = consumerClosed
}
func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
@@ -648,6 +659,14 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
return
}
+ if pc.state == consumerClosed || pc.state == consumerClosing {
+ pc.log.Error("The consumer is closing or has been closed")
+ if pc.nackTracker != nil {
+ pc.nackTracker.Close()
+ }
+ return
+ }
+
pc.state = consumerClosing
pc.log.Infof("Closing consumer=%d", pc.consumerID)
@@ -665,7 +684,9 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
pc.state = consumerClosed
pc.conn.DeleteConsumeHandler(pc.consumerID)
- pc.nackTracker.Close()
+ if pc.nackTracker != nil {
+ pc.nackTracker.Close()
+ }
close(pc.closeCh)
}