You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/28 09:22:59 UTC
[incubator-inlong] 02/03: Address review comments
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 2c6f4bbca5f8fc4fa6c95f5208b87fabe1787034
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 19:25:00 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/selector/ip_selector.go | 24 +++++++++++++++------
.../tubemq-client-go/selector/ip_selector_test.go | 25 +++++++++++++++-------
.../tubemq-client-go/selector/selector.go | 13 ++++++++---
3 files changed, 45 insertions(+), 17 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index a3547db..177f3ee 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -24,13 +24,18 @@ import (
func init() {
s := &ipSelector{}
- s.indexes = make(map[string]int)
+ s.services = make(map[string]*ipServices)
Register("ip", s)
Register("dns", s)
}
type ipSelector struct {
- indexes map[string]int
+ services map[string]*ipServices
+}
+
+type ipServices struct {
+ nextIndex int
+ addresses []string
}
// Select implements Selector interface.
@@ -50,15 +55,22 @@ func (s *ipSelector) Select(serviceName string) (*Node, error) {
}, nil
}
+ var addresses []string
nextIndex := 0
- if index, ok := s.indexes[serviceName]; ok {
- nextIndex = index
+ if _, ok := s.services[serviceName]; !ok {
+ addresses = strings.Split(serviceName, ",")
+ } else {
+ services := s.services[serviceName]
+ addresses = services.addresses
+ nextIndex = services.nextIndex
}
- addresses := strings.Split(serviceName, ",")
address := addresses[nextIndex]
nextIndex = (nextIndex + 1) % num
- s.indexes[serviceName] = nextIndex
+ s.services[serviceName] = &ipServices{
+ addresses: addresses,
+ nextIndex: nextIndex,
+ }
node := &Node{
ServiceName: serviceName,
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
index 6b5ba35..8233006 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -25,7 +25,8 @@ import (
func TestSingleIP(t *testing.T) {
serviceName := "192.168.0.1:9092"
- selector := Get("ip")
+ selector, err := Get("ip")
+ assert.Nil(t, err)
node, err := selector.Select(serviceName)
assert.Nil(t, err)
assert.Equal(t, node.HasNext, false)
@@ -35,7 +36,8 @@ func TestSingleIP(t *testing.T) {
func TestSingleDNS(t *testing.T) {
serviceName := "tubemq:8081"
- selector := Get("dns")
+ selector, err := Get("dns")
+ assert.Nil(t, err)
node, err := selector.Select(serviceName)
assert.Nil(t, err)
assert.Equal(t, node.HasNext, false)
@@ -45,7 +47,8 @@ func TestSingleDNS(t *testing.T) {
func TestMultipleIP(t *testing.T) {
serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
- selector := Get("dns")
+ selector, err := Get("dns")
+ assert.Nil(t, err)
node, err := selector.Select(serviceName)
assert.Nil(t, err)
assert.Equal(t, true, node.HasNext)
@@ -70,7 +73,8 @@ func TestMultipleIP(t *testing.T) {
func TestMultipleDNS(t *testing.T) {
serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
- selector := Get("dns")
+ selector, err := Get("dns")
+ assert.Nil(t, err)
node, err := selector.Select(serviceName)
assert.Nil(t, err)
assert.Equal(t, true, node.HasNext)
@@ -93,10 +97,15 @@ func TestMultipleDNS(t *testing.T) {
assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
}
-
func TestEmptyService(t *testing.T) {
serviceName := ""
- selector := Get("ip")
- _, err := selector.Select(serviceName)
+ selector, err := Get("ip")
+ assert.Nil(t, err)
+ _, err = selector.Select(serviceName)
assert.Error(t, err)
-}
\ No newline at end of file
+}
+
+func TestInvalidSelector(t *testing.T) {
+ _, err := Get("selector")
+ assert.Error(t, err)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 5b5e0c8..77352fd 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -18,6 +18,11 @@
// package selector defines the route selector which is responsible for service discovery.
package selector
+import (
+ "errors"
+ "fmt"
+)
+
// Selector is abstraction of route selector which can return an available address
// from the service name.
type Selector interface {
@@ -35,9 +40,11 @@ func Register(name string, s Selector) {
}
// Get returns the corresponding selector.
-func Get(name string) Selector {
- s := selectors[name]
- return s
+func Get(name string) (Selector, error) {
+ if _, ok := selectors[name]; !ok {
+ return nil, errors.New(fmt.Sprintf("selector %s is invalid", name))
+ }
+ return selectors[name], nil
}
// Node represents the service node.