You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/02/09 02:43:55 UTC
[pulsar-client-go] branch master updated: Fix unsubscribe blocked
when consumer is closing or has closed (#457)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 25f3075 Fix unsubscribe blocked when consumer is closing or has closed (#457)
25f3075 is described below
commit 25f3075176f853f5f09d89b3b625e0646432735f
Author: wuYin <wu...@gmail.com>
AuthorDate: Tue Feb 9 10:43:50 2021 +0800
Fix unsubscribe blocked when consumer is closing or has closed (#457)
### Motivation
For the present consumer, `Close()` and `Unsubscribe()` handled by the same eventloop goroutine.
The eventloop exited after `Close()`, then unsubscribe event wouldn't be selected and handled anymore, lead to block.
example:
```go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Unsubscribe() // unintentional
defer consumer.Close()
}
```
`Unsubscribe()` blocked:
![image](https://user-images.githubusercontent.com/24536920/106294060-ab5d6b80-6289-11eb-913c-85e1d18467a0.png)
### Modifications
Check consumer state before send unsubscribe event, if consumer is closing or has closed, just logging it
### Verifying this change
- [x] Make sure that the change passes the CI checks.
---
pulsar/consumer_partition.go | 31 +++++++++++++++++++++++++------
1 file changed, 25 insertions(+), 6 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b9c9b12..b75a7d7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -47,6 +47,21 @@ const (
consumerClosed
)
+func (s consumerState) String() string {
+ switch s {
+ case consumerInit:
+ return "Initializing"
+ case consumerReady:
+ return "Ready"
+ case consumerClosing:
+ return "Closing"
+ case consumerClosed:
+ return "Closed"
+ default:
+ return "Unknown"
+ }
+}
+
type subscriptionMode int
const (
@@ -195,6 +210,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
}
func (pc *partitionConsumer) Unsubscribe() error {
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
+ return nil
+ }
+
req := &unsubscribeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
@@ -206,9 +226,8 @@ func (pc *partitionConsumer) Unsubscribe() error {
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
- state := pc.getConsumerState()
- if state == consumerClosed || state == consumerClosing {
- pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
+ if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Failed to unsubscribe closing or closed consumer")
return
}
@@ -354,7 +373,7 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.Error("Consumer was already closed")
+ pc.log.WithField("state", state).Error("Consumer is closing or has closed")
return nil
}
@@ -398,7 +417,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
- pc.log.Error("Consumer was already closed")
+ pc.log.WithField("state", pc.state).Error("Consumer is closing or has closed")
return
}
@@ -798,7 +817,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}
if state == consumerClosed || state == consumerClosing {
- pc.log.Error("The consumer is closing or has been closed")
+ pc.log.WithField("state", state).Error("Consumer is closing or has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}