You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/15 01:05:05 UTC
[rocketmq-client-go] branch native updated: [ISSUSE #75] change
`Fatal(f)` method into `Error(f)` method (#110)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 6715597 [ISSUSE #75] change `Fatal(f)` method into `Error(f)` method (#110)
6715597 is described below
commit 67155979c93d2e95befd796285d21e047cc2d5c5
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Mon Jul 15 09:05:01 2019 +0800
[ISSUSE #75] change `Fatal(f)` method into `Error(f)` method (#110)
---
consumer/push_consumer.go | 29 +++++++++++++++++++----------
1 file changed, 19 insertions(+), 10 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 477c3e6..31996c5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -149,6 +149,15 @@ func (pc *pushConsumer) Start() error {
}
}()
+ err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
+ if err != nil {
+ pc.state = internal.StateCreateJust
+ err = fmt.Errorf("consumer group: [%s] has been created", pc.consumerGroup)
+ return
+ }
+ pc.client.UpdateTopicRouteInfo()
+ pc.client.Start()
+ pc.state = internal.StateRunning
})
pc.client.UpdateTopicRouteInfo()
@@ -218,18 +227,18 @@ func (pc *pushConsumer) validate() {
if pc.consumerGroup == internal.DefaultConsumerGroup {
// TODO FQA
- rlog.Fatalf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup)
+ rlog.Errorf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup)
}
if len(pc.subscribedTopic) == 0 {
- rlog.Fatal("number of subscribed topics is 0.")
+ rlog.Error("number of subscribed topics is 0.")
}
if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
pc.option.ConsumeConcurrentlyMaxSpan = 1000
} else {
- rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
+ rlog.Error("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
}
}
@@ -237,7 +246,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForQueue == 0 {
pc.option.PullThresholdForQueue = 1024
} else {
- rlog.Fatal("option.PullThresholdForQueue out of range [1, 65535]")
+ rlog.Error("option.PullThresholdForQueue out of range [1, 65535]")
}
}
@@ -245,7 +254,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdForTopic == 0 {
pc.option.PullThresholdForTopic = 102400
} else {
- rlog.Fatal("option.PullThresholdForTopic out of range [1, 6553500]")
+ rlog.Error("option.PullThresholdForTopic out of range [1, 6553500]")
}
}
@@ -253,7 +262,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForQueue == 0 {
pc.option.PullThresholdSizeForQueue = 512
} else {
- rlog.Fatal("option.PullThresholdSizeForQueue out of range [1, 1024]")
+ rlog.Error("option.PullThresholdSizeForQueue out of range [1, 1024]")
}
}
@@ -261,19 +270,19 @@ func (pc *pushConsumer) validate() {
if pc.option.PullThresholdSizeForTopic == 0 {
pc.option.PullThresholdSizeForTopic = 51200
} else {
- rlog.Fatal("option.PullThresholdSizeForTopic out of range [1, 102400]")
+ rlog.Error("option.PullThresholdSizeForTopic out of range [1, 102400]")
}
}
if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
- rlog.Fatal("option.PullInterval out of range [0, 65535]")
+ rlog.Error("option.PullInterval out of range [0, 65535]")
}
if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
if pc.option.ConsumeMessageBatchMaxSize == 0 {
pc.option.ConsumeMessageBatchMaxSize = 512
} else {
- rlog.Fatal("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
+ rlog.Error("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
}
}
@@ -281,7 +290,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullBatchSize == 0 {
pc.option.PullBatchSize = 32
} else {
- rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
+ rlog.Error("option.PullBatchSize out of range [1, 1024]")
}
}
}