You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/05/21 02:00:06 UTC
[rocketmq-client-go] branch native updated: fix panic bugs when
topic not exist (#63)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 5c00ac1 fix panic bugs when topic not exist (#63)
5c00ac1 is described below
commit 5c00ac16346864dd302e5de5e55c987b0b97e41c
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue May 21 09:59:59 2019 +0800
fix panic bugs when topic not exist (#63)
---
consumer/push_consumer.go | 15 +++++++++++++--
examples/consumer/main.go | 2 +-
examples/producer/main.go | 2 +-
go.sum | 3 +++
kernel/client.go | 12 +++++++++++-
producer/producer.go | 8 ++++++--
utils/string.go | 26 +++++++++++++++++++++++++-
utils/string_test.go | 28 ++++++++++++++++++++++++++++
8 files changed, 88 insertions(+), 8 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 284d14b..1cb5efd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,6 +20,7 @@ package consumer
import (
"context"
"errors"
+ "fmt"
"github.com/apache/rocketmq-client-go/kernel"
"github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
@@ -61,7 +62,10 @@ type pushConsumer struct {
subscribedTopic map[string]string
}
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error) {
+ if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+ return nil, err
+ }
opt.InstanceName = "DEFAULT"
opt.ClientIP = utils.LocalIP()
if opt.NameServerAddr == "" {
@@ -109,7 +113,7 @@ func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
} else {
p.submitToConsume = p.consumeMessageCurrently
}
- return p
+ return p, nil
}
func (pc *pushConsumer) Start() error {
@@ -158,6 +162,13 @@ func (pc *pushConsumer) Start() error {
})
pc.client.UpdateTopicRouteInfo()
+ for k := range pc.subscribedTopic {
+ _, exist := pc.topicSubscribeInfoTable.Load(k)
+ if !exist {
+ pc.client.Shutdown()
+ return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
+ }
+ }
pc.client.RebalanceImmediately()
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 53e9cb5..f660d43 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -26,7 +26,7 @@ import (
)
func main() {
- c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+ c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
NameServerAddr: "127.0.0.1:9876",
ConsumerModel: consumer.Clustering,
FromWhere: consumer.ConsumeFromFirstOffset,
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 83bd127..9cc626b 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
NameServerAddr: "127.0.0.1:9876",
RetryTimesWhenSendFailed: 2,
}
- p := producer.NewProducer(opt)
+ p, _ := producer.NewProducer(opt)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
diff --git a/go.sum b/go.sum
index 07969a4..6580f88 100644
--- a/go.sum
+++ b/go.sum
@@ -1,14 +1,17 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
diff --git a/kernel/client.go b/kernel/client.go
index b91ab1f..9dbecc6 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
_PersistOffset = 5 * time.Second
// Rebalance interval
- _RebalanceInterval = 100 * time.Millisecond
+ _RebalanceInterval = 10 * time.Second
)
var (
@@ -181,6 +181,10 @@ func (c *RMQClient) Start() {
})
}
+func (c *RMQClient) Shutdown() {
+ // TODO
+}
+
func (c *RMQClient) ClientID() string {
id := c.option.ClientIP + "@" + c.option.InstanceName
if c.option.UnitName != "" {
@@ -441,6 +445,9 @@ func (c *RMQClient) RebalanceImmediately() {
}
func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+ if data == nil {
+ return
+ }
if !c.isNeedUpdatePublishInfo(topic) {
return
}
@@ -467,6 +474,9 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
}
func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+ if data == nil {
+ return
+ }
if !c.isNeedUpdateSubscribeInfo(topic) {
return
}
diff --git a/producer/producer.go b/producer/producer.go
index 439c585..8bb0bf8 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/rocketmq-client-go/kernel"
"github.com/apache/rocketmq-client-go/remote"
"github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
"os"
"sync"
"sync/atomic"
@@ -37,7 +38,10 @@ type Producer interface {
SendOneWay(context.Context, *kernel.Message) error
}
-func NewProducer(opt ProducerOptions) Producer {
+func NewProducer(opt ProducerOptions) (Producer, error) {
+ if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
+ return nil, err
+ }
if opt.RetryTimesWhenSendFailed == 0 {
opt.RetryTimesWhenSendFailed = 2
}
@@ -52,7 +56,7 @@ func NewProducer(opt ProducerOptions) Producer {
group: "default",
client: kernel.GetOrNewRocketMQClient(opt.ClientOption),
options: opt,
- }
+ }, nil
}
type defaultProducer struct {
diff --git a/utils/string.go b/utils/string.go
index 6a74808..427a8a5 100644
--- a/utils/string.go
+++ b/utils/string.go
@@ -17,7 +17,16 @@ limitations under the License.
package utils
-import "fmt"
+import (
+ "errors"
+ "fmt"
+ "regexp"
+ "strings"
+)
+
+var (
+ ipRegex, _ = regexp.Compile(`^((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))\.){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))`)
+)
// HashString hashes a string to a unique hashcode.
func HashString(s string) int {
@@ -38,3 +47,18 @@ func StrJoin(str, key string, value interface{}) string {
return str + key + ": " + fmt.Sprint(value) + ", "
}
+
+func VerifyIP(ip string) error {
+ if strings.Contains(ip, ";") {
+ return errors.New("multiple IP addr does not support")
+ }
+ ips := ipRegex.FindAllString(ip, -1)
+ if len(ips) == 0 {
+ return errors.New("IP addr error")
+ }
+
+ if len(ips) > 1 {
+ return errors.New("multiple IP addr does not support")
+ }
+ return nil
+}
diff --git a/utils/string_test.go b/utils/string_test.go
new file mode 100644
index 0000000..f6d2fe6
--- /dev/null
+++ b/utils/string_test.go
@@ -0,0 +1,28 @@
+package utils
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestVerifyIP(t *testing.T) {
+ IPs := "127.0.0.1:9876"
+ err := VerifyIP(IPs)
+ assert.Nil(t, err)
+
+ IPs = "12.24.123.243:10911"
+ err = VerifyIP(IPs)
+ assert.Nil(t, err)
+
+ IPs = "xa2.0.0.1:9876"
+ err = VerifyIP(IPs)
+ assert.Equal(t, "IP addr error", err.Error())
+
+ IPs = "333.0.0.1:9876"
+ err = VerifyIP(IPs)
+ assert.Equal(t, "IP addr error", err.Error())
+
+ IPs = "127.0.0.1:9876;12.24.123.243:10911"
+ err = VerifyIP(IPs)
+ assert.Equal(t, "multiple IP addr does not support", err.Error())
+}