You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/05/21 02:00:06 UTC

[rocketmq-client-go] branch native updated: fix panic bugs when topic not exist (#63)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 5c00ac1  fix panic bugs when topic not exist (#63)
5c00ac1 is described below

commit 5c00ac16346864dd302e5de5e55c987b0b97e41c
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue May 21 09:59:59 2019 +0800

    fix panic bugs when topic not exist (#63)
---
 consumer/push_consumer.go | 15 +++++++++++++--
 examples/consumer/main.go |  2 +-
 examples/producer/main.go |  2 +-
 go.sum                    |  3 +++
 kernel/client.go          | 12 +++++++++++-
 producer/producer.go      |  8 ++++++--
 utils/string.go           | 26 +++++++++++++++++++++++++-
 utils/string_test.go      | 28 ++++++++++++++++++++++++++++
 8 files changed, 88 insertions(+), 8 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 284d14b..1cb5efd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,6 +20,7 @@ package consumer
 import (
 	"context"
 	"errors"
+	"fmt"
 	"github.com/apache/rocketmq-client-go/kernel"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
@@ -61,7 +62,10 @@ type pushConsumer struct {
 	subscribedTopic              map[string]string
 }
 
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error) {
+	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+		return nil, err
+	}
 	opt.InstanceName = "DEFAULT"
 	opt.ClientIP = utils.LocalIP()
 	if opt.NameServerAddr == "" {
@@ -109,7 +113,7 @@ func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
 	} else {
 		p.submitToConsume = p.consumeMessageCurrently
 	}
-	return p
+	return p, nil
 }
 
 func (pc *pushConsumer) Start() error {
@@ -158,6 +162,13 @@ func (pc *pushConsumer) Start() error {
 	})
 
 	pc.client.UpdateTopicRouteInfo()
+	for k := range pc.subscribedTopic {
+		_, exist := pc.topicSubscribeInfoTable.Load(k)
+		if !exist {
+			pc.client.Shutdown()
+			return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
+		}
+	}
 	pc.client.RebalanceImmediately()
 	pc.client.CheckClientInBroker()
 	pc.client.SendHeartbeatToAllBrokerWithLock()
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 53e9cb5..f660d43 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -26,7 +26,7 @@ import (
 )
 
 func main() {
-	c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+	c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
 		NameServerAddr: "127.0.0.1:9876",
 		ConsumerModel:  consumer.Clustering,
 		FromWhere:      consumer.ConsumeFromFirstOffset,
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 83bd127..9cc626b 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
 		NameServerAddr:           "127.0.0.1:9876",
 		RetryTimesWhenSendFailed: 2,
 	}
-	p := producer.NewProducer(opt)
+	p, _ := producer.NewProducer(opt)
 	err := p.Start()
 	if err != nil {
 		fmt.Printf("start producer error: %s", err.Error())
diff --git a/go.sum b/go.sum
index 07969a4..6580f88 100644
--- a/go.sum
+++ b/go.sum
@@ -1,14 +1,17 @@
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
 github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
diff --git a/kernel/client.go b/kernel/client.go
index b91ab1f..9dbecc6 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
 	_PersistOffset = 5 * time.Second
 
 	// Rebalance interval
-	_RebalanceInterval = 100 * time.Millisecond
+	_RebalanceInterval = 10 * time.Second
 )
 
 var (
@@ -181,6 +181,10 @@ func (c *RMQClient) Start() {
 	})
 }
 
+func (c *RMQClient) Shutdown() {
+	// TODO
+}
+
 func (c *RMQClient) ClientID() string {
 	id := c.option.ClientIP + "@" + c.option.InstanceName
 	if c.option.UnitName != "" {
@@ -441,6 +445,9 @@ func (c *RMQClient) RebalanceImmediately() {
 }
 
 func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+	if data == nil {
+		return
+	}
 	if !c.isNeedUpdatePublishInfo(topic) {
 		return
 	}
@@ -467,6 +474,9 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
 }
 
 func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+	if data == nil {
+		return
+	}
 	if !c.isNeedUpdateSubscribeInfo(topic) {
 		return
 	}
diff --git a/producer/producer.go b/producer/producer.go
index 439c585..8bb0bf8 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/rocketmq-client-go/kernel"
 	"github.com/apache/rocketmq-client-go/remote"
 	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
 	"os"
 	"sync"
 	"sync/atomic"
@@ -37,7 +38,10 @@ type Producer interface {
 	SendOneWay(context.Context, *kernel.Message) error
 }
 
-func NewProducer(opt ProducerOptions) Producer {
+func NewProducer(opt ProducerOptions) (Producer, error) {
+	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+		return nil, err
+	}
 	if opt.RetryTimesWhenSendFailed == 0 {
 		opt.RetryTimesWhenSendFailed = 2
 	}
@@ -52,7 +56,7 @@ func NewProducer(opt ProducerOptions) Producer {
 		group:   "default",
 		client:  kernel.GetOrNewRocketMQClient(opt.ClientOption),
 		options: opt,
-	}
+	}, nil
 }
 
 type defaultProducer struct {
diff --git a/utils/string.go b/utils/string.go
index 6a74808..427a8a5 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -17,7 +17,16 @@ limitations under the License.
 
 package utils
 
-import "fmt"
+import (
+	"errors"
+	"fmt"
+	"regexp"
+	"strings"
+)
+
+var (
+	ipRegex, _ = regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+)
 
 // HashString hashes a string to a unique hashcode.
 func HashString(s string) int {
@@ -38,3 +47,18 @@ func StrJoin(str, key string, value interface{}) string {
 
 	return str + key + ": " + fmt.Sprint(value) + ", "
 }
+
+func VerifyIP(ip string) error {
+	if strings.Contains(ip, ";") {
+		return errors.New("multiple IP addr does not support")
+	}
+	ips := ipRegex.FindAllString(ip, -1)
+	if len(ips) == 0 {
+		return errors.New("IP addr error")
+	}
+
+	if len(ips) > 1 {
+		return errors.New("multiple IP addr does not support")
+	}
+	return nil
+}
diff --git a/utils/string_test.go b/utils/string_test.go
new file mode 100644
index 0000000..f6d2fe6
--- /dev/null
+++ b/utils/string_test.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestVerifyIP(t *testing.T) {
+	IPs := "127.0.0.1:9876"
+	err := VerifyIP(IPs)
+	assert.Nil(t, err)
+
+	IPs = "12.24.123.243:10911"
+	err = VerifyIP(IPs)
+	assert.Nil(t, err)
+
+	IPs = "xa2.0.0.1:9876"
+	err = VerifyIP(IPs)
+	assert.Equal(t, "IP addr error", err.Error())
+
+	IPs = "333.0.0.1:9876"
+	err = VerifyIP(IPs)
+	assert.Equal(t, "IP addr error", err.Error())
+
+	IPs = "127.0.0.1:9876;12.24.123.243:10911"
+	err = VerifyIP(IPs)
+	assert.Equal(t, "multiple IP addr does not support", err.Error())
+}