You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/19 13:30:14 UTC

[rocketmq-client-go] branch native updated: fix(consumer): use sync offset fix instead if async (#414)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 089cc03  fix(consumer): use sync offset fix instead if async (#414)
089cc03 is described below

commit 089cc03d644e54468b6688a74d26e5d23594fabe
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 19 21:30:02 2020 +0800

    fix(consumer): use sync offset fix instead if async (#414)
    
    fix(consumer): use sync offset fix instead if async (#414)
---
 consumer/push_consumer.go | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3bef44c..cb14688 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -679,13 +679,11 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			})
 			request.nextOffset = result.NextBeginOffset
 			pq.WithDropped(true)
-			go primitive.WithRecover(func() {
-				time.Sleep(10 * time.Second)
-				pc.storage.update(request.mq, request.nextOffset, false)
-				pc.storage.persist([]*primitive.MessageQueue{request.mq})
-				pc.storage.remove(request.mq)
-				rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
-			})
+			time.Sleep(10 * time.Second)
+			pc.storage.update(request.mq, request.nextOffset, false)
+			pc.storage.persist([]*primitive.MessageQueue{request.mq})
+			pc.processQueueTable.Delete(request.mq)
+			rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
 		default:
 			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
 			sleepTime = _PullDelayTimeWhenError