You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/19 13:30:14 UTC
[rocketmq-client-go] branch native updated: fix(consumer): use sync
offset fix instead if async (#414)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 089cc03 fix(consumer): use sync offset fix instead if async (#414)
089cc03 is described below
commit 089cc03d644e54468b6688a74d26e5d23594fabe
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 19 21:30:02 2020 +0800
fix(consumer): use sync offset fix instead if async (#414)
fix(consumer): use sync offset fix instead if async (#414)
---
consumer/push_consumer.go | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3bef44c..cb14688 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -679,13 +679,11 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
- go primitive.WithRecover(func() {
- time.Sleep(10 * time.Second)
- pc.storage.update(request.mq, request.nextOffset, false)
- pc.storage.persist([]*primitive.MessageQueue{request.mq})
- pc.storage.remove(request.mq)
- rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
- })
+ time.Sleep(10 * time.Second)
+ pc.storage.update(request.mq, request.nextOffset, false)
+ pc.storage.persist([]*primitive.MessageQueue{request.mq})
+ pc.processQueueTable.Delete(request.mq)
+ rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError