You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/09/01 11:50:38 UTC

[rocketmq-client-go] branch master updated: fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new de2dc05  fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)
de2dc05 is described below

commit de2dc059378a1f4b893a3c05e3905957cec8972a
Author: Nick <ni...@lampnick.com>
AuthorDate: Thu Sep 1 19:50:33 2022 +0800

    fix:when msgCh full,can't release the pq.mutex lock,removeMessage method will blocked. (#903)
---
 consumer/process_queue.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 2d35c07..7bcfa2f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -96,19 +96,19 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 	if len(messages) == 0 {
 		return
 	}
-	pq.mutex.Lock()
 	if pq.IsDroppd() {
-		pq.mutex.Unlock()
 		return
 	}
 	if !pq.order {
 		select {
 		case <-pq.closeChan:
-			pq.mutex.Unlock()
 			return
 		case pq.msgCh <- messages:
 		}
 	}
+
+	pq.mutex.Lock()
+
 	validMessageCount := 0
 	for idx := range messages {
 		msg := messages[idx]