You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/12/01 10:33:05 UTC
[rocketmq-client-go] branch master updated: push consumer enable getting offset diff map (#964)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0742aac push consumer enable getting offset diff map (#964)
0742aac is described below
commit 0742aac1437d09fac948ea6572ad0107fb8fce66
Author: Kay Du <ka...@gmail.com>
AuthorDate: Thu Dec 1 18:33:00 2022 +0800
push consumer enable getting offset diff map (#964)
Co-authored-by: 筱瑜 <ma...@alibaba-inc.com>
---
consumer/process_queue.go | 3 +++
consumer/push_consumer.go | 24 ++++++++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 49dae7f..aeb66d8 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -60,6 +60,7 @@ type processQueue struct {
order bool
closeChanOnce *sync.Once
closeChan chan struct{}
+ maxOffsetInQueue int64
}
func newProcessQueue(order bool) *processQueue {
@@ -88,6 +89,7 @@ func newProcessQueue(order bool) *processQueue {
closeChan: make(chan struct{}),
locked: atomic.NewBool(false),
dropped: atomic.NewBool(false),
+ maxOffsetInQueue: -1,
}
return pq
}
@@ -372,6 +374,7 @@ func (pq *processQueue) clear() {
pq.cachedMsgCount.Store(0)
pq.cachedMsgSize.Store(0)
pq.queueOffsetMax = 0
+ pq.maxOffsetInQueue = -1
}
func (pq *processQueue) commit() int64 {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index e91bad0..a3366e6 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -249,6 +249,27 @@ func (pc *pushConsumer) Start() error {
return err
}
+func (pc *pushConsumer) GetOffsetDiffMap() map[string]int64 {
+ offsetDiffMap := make(map[string]int64)
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ mq := key.(primitive.MessageQueue)
+ pq := value.(*processQueue)
+ topic := mq.Topic
+ consumerOffset, _ := pc.storage.readWithException(&mq, _ReadFromMemory)
+ maxOffset := pq.maxOffsetInQueue
+ if consumerOffset < 0 || maxOffset < 0 || consumerOffset > maxOffset {
+ return true
+ }
+ if _, ok := offsetDiffMap[topic]; !ok {
+ offsetDiffMap[topic] = 0
+ }
+ offsetDiff := offsetDiffMap[topic]
+ offsetDiffMap[topic] = offsetDiff + (maxOffset - consumerOffset)
+ return true
+ })
+ return offsetDiffMap
+}
+
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
@@ -823,6 +844,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
}
pc.processPullResult(request.mq, result, sd)
+ if result.MaxOffset > pq.maxOffsetInQueue {
+ pq.maxOffsetInQueue = result.MaxOffset
+ }
switch result.Status {
case primitive.PullFound: