You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/11/01 08:55:36 UTC

[rocketmq-client-go] branch master updated: [ISSUE #927] fix processQueue remove offset (#928)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8afd69f  [ISSUE #927] fix processQueue remove offset (#928)
8afd69f is described below

commit 8afd69f8b0f0fa0cdad877536a6c216ee722eec7
Author: 0daypwn <30...@users.noreply.github.com>
AuthorDate: Tue Nov 1 16:55:29 2022 +0800

    [ISSUE #927] fix processQueue remove offset (#928)
    
    Co-authored-by: wuxb02 <wu...@mingyuanyun.com>
---
 consumer/process_queue.go | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 7bcfa2f..49dae7f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -99,16 +99,7 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 	if pq.IsDroppd() {
 		return
 	}
-	if !pq.order {
-		select {
-		case <-pq.closeChan:
-			return
-		case pq.msgCh <- messages:
-		}
-	}
-
 	pq.mutex.Lock()
-
 	validMessageCount := 0
 	for idx := range messages {
 		msg := messages[idx]
@@ -126,9 +117,15 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 
 		pq.cachedMsgSize.Add(int64(len(msg.Body)))
 	}
-
 	pq.cachedMsgCount.Add(int64(validMessageCount))
 	pq.mutex.Unlock()
+	if !pq.order {
+		select {
+		case <-pq.closeChan:
+			return
+		case pq.msgCh <- messages:
+		}
+	}
 
 	if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
 		pq.consuming = true