You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/06/17 11:57:36 UTC
[rocketmq-client-go] branch master updated: Fix go routine leaks
when consumer close with msg channel blocked (#642)
This is an automated email from the ASF dual-hosted git repository.
maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c688c19 Fix go routine leaks when consumer close with msg channel blocked (#642)
c688c19 is described below
commit c688c190734dcdb6e2fc9b2f6791544d648185a2
Author: 张旭 <ma...@gmail.com>
AuthorDate: Thu Jun 17 19:56:23 2021 +0800
Fix go routine leaks when consumer close with msg channel blocked (#642)
Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
consumer/consumer.go | 4 ++++
consumer/process_queue.go | 4 ++++
2 files changed, 8 insertions(+)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 584a4b5..42bc973 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -305,6 +305,10 @@ func (dc *defaultConsumer) shutdown() error {
k := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pq.WithDropped(true)
+ // close msg channel using RWMutex to make sure no data was writing
+ pq.mutex.Lock()
+ close(pq.msgCh)
+ pq.mutex.Unlock()
mqs = append(mqs, &k)
return true
})
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index a306470..0e9d8ec 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -91,6 +91,10 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
return
}
pq.mutex.Lock()
+ if pq.IsDroppd() {
+ pq.mutex.Unlock()
+ return
+ }
if !pq.order {
pq.msgCh <- messages
}