You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 02:00:38 UTC

[rocketmq-client-go] branch master updated: close msgCh when pq droped (#859)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 381a879  close msgCh when pq droped (#859)
381a879 is described below

commit 381a87927d7f9acbda46ed10f575624b9f456ad0
Author: dinglei <li...@163.com>
AuthorDate: Mon Jul 25 10:00:34 2022 +0800

    close msgCh when pq droped (#859)
---
 consumer/process_queue.go | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 497b695..ded8d08 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -58,6 +58,8 @@ type processQueue struct {
 	lockConsume                sync.Mutex
 	msgCh                      chan []*primitive.MessageExt
 	order                      bool
+	closeChanOnce              *sync.Once
+	closeChan                  chan struct{}
 }
 
 func newProcessQueue(order bool) *processQueue {
@@ -80,6 +82,8 @@ func newProcessQueue(order bool) *processQueue {
 		msgCh:                      make(chan []*primitive.MessageExt, 32),
 		consumingMsgOrderlyTreeMap: consumingMsgOrderlyTreeMap,
 		order:                      order,
+		closeChanOnce:              &sync.Once{},
+		closeChan:                  make(chan struct{}),
 		locked:                     uatomic.NewBool(false),
 		dropped:                    uatomic.NewBool(false),
 	}
@@ -96,7 +100,11 @@ func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 		return
 	}
 	if !pq.order {
-		pq.msgCh <- messages
+		select {
+		case <-pq.closeChan:
+			return
+		case pq.msgCh <- messages:
+		}
 	}
 	validMessageCount := 0
 	for idx := range messages {
@@ -142,6 +150,9 @@ func (pq *processQueue) IsLock() bool {
 
 func (pq *processQueue) WithDropped(dropped bool) {
 	pq.dropped.Store(dropped)
+	pq.closeChanOnce.Do(func() {
+		close(pq.closeChan)
+	})
 }
 
 func (pq *processQueue) IsDroppd() bool {
@@ -274,7 +285,12 @@ func (pq *processQueue) getMaxSpan() int {
 }
 
 func (pq *processQueue) getMessages() []*primitive.MessageExt {
-	return <-pq.msgCh
+	select {
+	case <-pq.closeChan:
+		return nil
+	case mq := <-pq.msgCh:
+		return mq
+	}
 }
 
 func (pq *processQueue) takeMessages(number int) []*primitive.MessageExt {