You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/11/01 08:55:36 UTC
[rocketmq-client-go] branch master updated: [ISSUE #927] fix processQueue remove offset (#928)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8afd69f [ISSUE #927] fix processQueue remove offset (#928)
8afd69f is described below
commit 8afd69f8b0f0fa0cdad877536a6c216ee722eec7
Author: 0daypwn <30...@users.noreply.github.com>
AuthorDate: Tue Nov 1 16:55:29 2022 +0800
[ISSUE #927] fix processQueue remove offset (#928)
Co-authored-by: wuxb02 <wu...@mingyuanyun.com>
---
consumer/process_queue.go | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 7bcfa2f..49dae7f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -99,16 +99,7 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
if pq.IsDroppd() {
return
}
- if !pq.order {
- select {
- case <-pq.closeChan:
- return
- case pq.msgCh <- messages:
- }
- }
-
pq.mutex.Lock()
-
validMessageCount := 0
for idx := range messages {
msg := messages[idx]
@@ -126,9 +117,15 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
pq.cachedMsgSize.Add(int64(len(msg.Body)))
}
-
pq.cachedMsgCount.Add(int64(validMessageCount))
pq.mutex.Unlock()
+ if !pq.order {
+ select {
+ case <-pq.closeChan:
+ return
+ case pq.msgCh <- messages:
+ }
+ }
if pq.cachedMsgCount.Load() > 0 && !pq.consuming {
pq.consuming = true