You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/06/17 11:57:36 UTC

[rocketmq-client-go] branch master updated: Fix go routine leaks when consumer close with msg channel blocked (#642)

This is an automated email from the ASF dual-hosted git repository.

maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c688c19  Fix go routine leaks when consumer close with msg channel blocked (#642)
c688c19 is described below

commit c688c190734dcdb6e2fc9b2f6791544d648185a2
Author: 张旭 <ma...@gmail.com>
AuthorDate: Thu Jun 17 19:56:23 2021 +0800

    Fix go routine leaks when consumer close with msg channel blocked (#642)
    
    Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
 consumer/consumer.go      | 4 ++++
 consumer/process_queue.go | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 584a4b5..42bc973 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -305,6 +305,10 @@ func (dc *defaultConsumer) shutdown() error {
 		k := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
 		pq.WithDropped(true)
+		// close msg channel using RWMutex to make sure no data was writing
+		pq.mutex.Lock()
+		close(pq.msgCh)
+		pq.mutex.Unlock()
 		mqs = append(mqs, &k)
 		return true
 	})
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index a306470..0e9d8ec 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -91,6 +91,10 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 		return
 	}
 	pq.mutex.Lock()
+	if pq.IsDroppd() {
+		pq.mutex.Unlock()
+		return
+	}
 	if !pq.order {
 		pq.msgCh <- messages
 	}