You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2023/03/20 02:59:04 UTC
[rocketmq-client-go] branch master updated: fix: unlock all queues when consumer shutdown in orderly model
This is an automated email from the ASF dual-hosted git repository.
tigerlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9bca88d fix: unlock all queues when consumer shutdown in orderly model
new 3b8b238 Merge pull request #1015 from cserwen/orderly_consume_github
9bca88d is described below
commit 9bca88dcb44a0f6a4e21639bb269271eafafbe64
Author: dengzhiwen1 <de...@xiaomi.com>
AuthorDate: Fri Mar 10 14:03:58 2023 +0800
fix: unlock all queues when consumer shutdown in orderly model
---
consumer/consumer.go | 12 ++++++------
consumer/push_consumer.go | 4 +++-
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 1511f01..7bd2b02 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -631,12 +631,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
}
} else {
response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
- rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
- rlog.LogKeyBroker: addr,
- rlog.LogKeyUnderlayError: err,
- })
- if response.Code != internal.ResSuccess {
- // TODO error
+ if err != nil || response == nil || response.Code != internal.ResSuccess {
+ rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
+ rlog.LogKeyBroker: addr,
+ rlog.LogKeyUnderlayError: err,
+ "response": response,
+ })
}
}
}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 7d585de..85f9725 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -277,7 +277,9 @@ func (pc *pushConsumer) Shutdown() error {
pc.option.TraceDispatcher.Close()
}
close(pc.done)
-
+ if pc.consumeOrderly && pc.model == Clustering {
+ pc.unlockAll(false)
+ }
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})