You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/09/01 11:50:38 UTC
[rocketmq-client-go] branch master updated: fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new de2dc05 fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)
de2dc05 is described below
commit de2dc059378a1f4b893a3c05e3905957cec8972a
Author: Nick <ni...@lampnick.com>
AuthorDate: Thu Sep 1 19:50:33 2022 +0800
fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)
---
consumer/process_queue.go | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 2d35c07..7bcfa2f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -96,19 +96,19 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
if len(messages) == 0 {
return
}
- pq.mutex.Lock()
if pq.IsDroppd() {
- pq.mutex.Unlock()
return
}
if !pq.order {
select {
case <-pq.closeChan:
- pq.mutex.Unlock()
return
case pq.msgCh <- messages:
}
}
+
+ pq.mutex.Lock()
+
validMessageCount := 0
for idx := range messages {
msg := messages[idx]