You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/11/26 06:33:19 UTC

[rocketmq-client-go] branch master updated: allow further subscription even after client starts, to align with Java SDK's behavior

This is an automated email from the ASF dual-hosted git repository.

jianhaixu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f554daf  allow further subscription even after client starts, to align with Java SDK's behavior
     new 9d9e413  Merge pull request #560 from beiwei30/subscribe-after-start
f554daf is described below

commit f554daff344769434c823c9cb57e1e6f48b4c390
Author: Ian Luo <ia...@gmail.com>
AuthorDate: Tue Nov 24 19:48:25 2020 +0800

    allow further subscription even after client starts, to align with Java SDK's behavior
---
 consumer/push_consumer.go | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58945c2..931fefa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -222,9 +222,11 @@ func (pc *pushConsumer) Shutdown() error {
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
-	if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
-		return errors.New("subscribe topic only started before")
+	if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+		atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+		return errors.New("cannot subscribe topic since client either failed to start or has been shutdown.")
 	}
+
 	if pc.option.Namespace != "" {
 		topic = pc.option.Namespace + "%" + topic
 	}