You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/11/15 05:11:11 UTC

[rocketmq-client-go] branch master updated: [offset] Optimize the update offset logic (#732)

This is an automated email from the ASF dual-hosted git repository.

maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a8b10ce  [offset] Optimize the update offset logic (#732)
a8b10ce is described below

commit a8b10ce63a8d87927271250b52e36cacc61d2222
Author: zhangyang <zh...@xiaomi.com>
AuthorDate: Mon Nov 15 13:11:04 2021 +0800

    [offset] Optimize the update offset logic (#732)
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
 consumer/push_consumer.go | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8827a87..9b38d0f 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -764,10 +764,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 					"prevRequestOffset": prevRequestOffset,
 				})
 			}
-		case primitive.PullNoNewMsg:
-			rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
-				request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
-		case primitive.PullNoMsgMatched:
+		case primitive.PullNoNewMsg, primitive.PullNoMsgMatched:
 			request.nextOffset = result.NextBeginOffset
 			pc.correctTagsOffset(request)
 		case primitive.PullOffsetIllegal:
@@ -790,7 +787,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 }
 
 func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
-	// TODO
+	if pr.pq.cachedMsgCount <= 0 {
+		pc.storage.update(pr.mq, pr.nextOffset, true)
+	}
 }
 
 func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {