You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 02:00:38 UTC
[rocketmq-client-go] branch master updated: close msgCh when pq droped (#859)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 381a879 close msgCh when pq droped (#859)
381a879 is described below
commit 381a87927d7f9acbda46ed10f575624b9f456ad0
Author: dinglei <li...@163.com>
AuthorDate: Mon Jul 25 10:00:34 2022 +0800
close msgCh when pq droped (#859)
---
consumer/process_queue.go | 20 ++++++++++++++++++--
1 file changed, 18 insertions(+), 2 deletions(-)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 497b695..ded8d08 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -58,6 +58,8 @@ type processQueue struct {
lockConsume sync.Mutex
msgCh chan []*primitive.MessageExt
order bool
+ closeChanOnce *sync.Once
+ closeChan chan struct{}
}
func newProcessQueue(order bool) *processQueue {
@@ -80,6 +82,8 @@ func newProcessQueue(order bool) *processQueue {
msgCh: make(chan []*primitive.MessageExt, 32),
consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
order: order,
+ closeChanOnce: &sync.Once{},
+ closeChan: make(chan struct{}),
locked: uatomic.NewBool(false),
dropped: uatomic.NewBool(false),
}
@@ -96,7 +100,11 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
return
}
if !pq.order {
- pq.msgCh <- messages
+ select {
+ case <-pq.closeChan:
+ return
+ case pq.msgCh <- messages:
+ }
}
validMessageCount := 0
for idx := range messages {
@@ -142,6 +150,9 @@ func (pq *processQueue) IsLock() bool {
func (pq *processQueue) WithDropped(dropped bool) {
pq.dropped.Store(dropped)
+ pq.closeChanOnce.Do(func() {
+ close(pq.closeChan)
+ })
}
func (pq *processQueue) IsDroppd() bool {
@@ -274,7 +285,12 @@ func (pq *processQueue) getMaxSpan() int {
}
func (pq *processQueue) getMessages() []*primitive.MessageExt {
- return <-pq.msgCh
+ select {
+ case <-pq.closeChan:
+ return nil
+ case mq := <-pq.msgCh:
+ return mq
+ }
}
func (pq *processQueue) takeMessages(number int) []*primitive.MessageExt {