You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/07/15 01:05:05 UTC

[rocketmq-client-go] branch native updated: [ISSUSE #75] change `Fatal(f)` method into `Error(f)` method (#110)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 6715597  [ISSUSE #75] change `Fatal(f)` method into `Error(f)` method (#110)
6715597 is described below

commit 67155979c93d2e95befd796285d21e047cc2d5c5
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Mon Jul 15 09:05:01 2019 +0800

    [ISSUSE #75] change `Fatal(f)` method into `Error(f)` method (#110)
---
 consumer/push_consumer.go | 29 +++++++++++++++++++----------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 477c3e6..31996c5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -149,6 +149,15 @@ func (pc *pushConsumer) Start() error {
 			}
 		}()
 
+		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
+		if err != nil {
+			pc.state = internal.StateCreateJust
+			err = fmt.Errorf("consumer group: [%s] has been created", pc.consumerGroup)
+			return
+		}
+		pc.client.UpdateTopicRouteInfo()
+		pc.client.Start()
+		pc.state = internal.StateRunning
 	})
 
 	pc.client.UpdateTopicRouteInfo()
@@ -218,18 +227,18 @@ func (pc *pushConsumer) validate() {
 
 	if pc.consumerGroup == internal.DefaultConsumerGroup {
 		// TODO FQA
-		rlog.Fatalf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup)
+		rlog.Errorf("consumerGroup can't equal [%s], please specify another one.", internal.DefaultConsumerGroup)
 	}
 
 	if len(pc.subscribedTopic) == 0 {
-		rlog.Fatal("number of subscribed topics is 0.")
+		rlog.Error("number of subscribed topics is 0.")
 	}
 
 	if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
 		if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
 			pc.option.ConsumeConcurrentlyMaxSpan = 1000
 		} else {
-			rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
+			rlog.Error("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
 		}
 	}
 
@@ -237,7 +246,7 @@ func (pc *pushConsumer) validate() {
 		if pc.option.PullThresholdForQueue == 0 {
 			pc.option.PullThresholdForQueue = 1024
 		} else {
-			rlog.Fatal("option.PullThresholdForQueue out of range [1, 65535]")
+			rlog.Error("option.PullThresholdForQueue out of range [1, 65535]")
 		}
 	}
 
@@ -245,7 +254,7 @@ func (pc *pushConsumer) validate() {
 		if pc.option.PullThresholdForTopic == 0 {
 			pc.option.PullThresholdForTopic = 102400
 		} else {
-			rlog.Fatal("option.PullThresholdForTopic out of range [1, 6553500]")
+			rlog.Error("option.PullThresholdForTopic out of range [1, 6553500]")
 		}
 	}
 
@@ -253,7 +262,7 @@ func (pc *pushConsumer) validate() {
 		if pc.option.PullThresholdSizeForQueue == 0 {
 			pc.option.PullThresholdSizeForQueue = 512
 		} else {
-			rlog.Fatal("option.PullThresholdSizeForQueue out of range [1, 1024]")
+			rlog.Error("option.PullThresholdSizeForQueue out of range [1, 1024]")
 		}
 	}
 
@@ -261,19 +270,19 @@ func (pc *pushConsumer) validate() {
 		if pc.option.PullThresholdSizeForTopic == 0 {
 			pc.option.PullThresholdSizeForTopic = 51200
 		} else {
-			rlog.Fatal("option.PullThresholdSizeForTopic out of range [1, 102400]")
+			rlog.Error("option.PullThresholdSizeForTopic out of range [1, 102400]")
 		}
 	}
 
 	if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
-		rlog.Fatal("option.PullInterval out of range [0, 65535]")
+		rlog.Error("option.PullInterval out of range [0, 65535]")
 	}
 
 	if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
 		if pc.option.ConsumeMessageBatchMaxSize == 0 {
 			pc.option.ConsumeMessageBatchMaxSize = 512
 		} else {
-			rlog.Fatal("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
+			rlog.Error("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
 		}
 	}
 
@@ -281,7 +290,7 @@ func (pc *pushConsumer) validate() {
 		if pc.option.PullBatchSize == 0 {
 			pc.option.PullBatchSize = 32
 		} else {
-			rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
+			rlog.Error("option.PullBatchSize out of range [1, 1024]")
 		}
 	}
 }