You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/11/26 06:33:19 UTC
[rocketmq-client-go] branch master updated: allow further
subscription even after client starts, to align with Java SDK's behavior
This is an automated email from the ASF dual-hosted git repository.
jianhaixu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f554daf allow further subscription even after client starts, to align with Java SDK's behavior
new 9d9e413 Merge pull request #560 from beiwei30/subscribe-after-start
f554daf is described below
commit f554daff344769434c823c9cb57e1e6f48b4c390
Author: Ian Luo <ia...@gmail.com>
AuthorDate: Tue Nov 24 19:48:25 2020 +0800
allow further subscription even after client starts, to align with Java SDK's behavior
---
consumer/push_consumer.go | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58945c2..931fefa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -222,9 +222,11 @@ func (pc *pushConsumer) Shutdown() error {
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
- if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
- return errors.New("subscribe topic only started before")
+ if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+ atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+ return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
}
+
if pc.option.Namespace != "" {
topic = pc.option.Namespace + "%" + topic
}