You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/28 09:22:59 UTC

[incubator-inlong] 02/03: Address review comments

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2c6f4bbca5f8fc4fa6c95f5208b87fabe1787034
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 19:25:00 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/selector/ip_selector.go       | 24 +++++++++++++++------
 .../tubemq-client-go/selector/ip_selector_test.go  | 25 +++++++++++++++-------
 .../tubemq-client-go/selector/selector.go          | 13 ++++++++---
 3 files changed, 45 insertions(+), 17 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index a3547db..177f3ee 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -24,13 +24,18 @@ import (
 
 func init() {
 	s := &ipSelector{}
-	s.indexes = make(map[string]int)
+	s.services = make(map[string]*ipServices)
 	Register("ip", s)
 	Register("dns", s)
 }
 
 type ipSelector struct {
-	indexes map[string]int
+	services map[string]*ipServices
+}
+
+type ipServices struct {
+	nextIndex int
+	addresses []string
 }
 
 // Select implements Selector interface.
@@ -50,15 +55,22 @@ func (s *ipSelector) Select(serviceName string) (*Node, error) {
 		}, nil
 	}
 
+	var addresses []string
 	nextIndex := 0
-	if index, ok := s.indexes[serviceName]; ok {
-		nextIndex = index
+	if _, ok := s.services[serviceName]; !ok {
+		addresses = strings.Split(serviceName, ",")
+	} else {
+		services := s.services[serviceName]
+		addresses = services.addresses
+		nextIndex = services.nextIndex
 	}
 
-	addresses := strings.Split(serviceName, ",")
 	address := addresses[nextIndex]
 	nextIndex = (nextIndex + 1) % num
-	s.indexes[serviceName] = nextIndex
+	s.services[serviceName] = &ipServices{
+		addresses: addresses,
+		nextIndex: nextIndex,
+	}
 
 	node := &Node{
 		ServiceName: serviceName,
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
index 6b5ba35..8233006 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -25,7 +25,8 @@ import (
 
 func TestSingleIP(t *testing.T) {
 	serviceName := "192.168.0.1:9092"
-	selector := Get("ip")
+	selector, err := Get("ip")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, node.HasNext, false)
@@ -35,7 +36,8 @@ func TestSingleIP(t *testing.T) {
 
 func TestSingleDNS(t *testing.T) {
 	serviceName := "tubemq:8081"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, node.HasNext, false)
@@ -45,7 +47,8 @@ func TestSingleDNS(t *testing.T) {
 
 func TestMultipleIP(t *testing.T) {
 	serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, true, node.HasNext)
@@ -70,7 +73,8 @@ func TestMultipleIP(t *testing.T) {
 
 func TestMultipleDNS(t *testing.T) {
 	serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, true, node.HasNext)
@@ -93,10 +97,15 @@ func TestMultipleDNS(t *testing.T) {
 	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
 }
 
-
 func TestEmptyService(t *testing.T) {
 	serviceName := ""
-	selector := Get("ip")
-	_, err := selector.Select(serviceName)
+	selector, err := Get("ip")
+	assert.Nil(t, err)
+	_, err = selector.Select(serviceName)
 	assert.Error(t, err)
-}
\ No newline at end of file
+}
+
+func TestInvalidSelector(t *testing.T) {
+	_, err := Get("selector")
+	assert.Error(t, err)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 5b5e0c8..77352fd 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -18,6 +18,11 @@
 // package selector defines the route selector which is responsible for service discovery.
 package selector
 
+import (
+	"errors"
+	"fmt"
+)
+
 // Selector is abstraction of route selector which can return an available address
 // from the service name.
 type Selector interface {
@@ -35,9 +40,11 @@ func Register(name string, s Selector) {
 }
 
 // Get returns the corresponding selector.
-func Get(name string) Selector {
-	s := selectors[name]
-	return s
+func Get(name string) (Selector, error) {
+	if _, ok := selectors[name]; !ok {
+		return nil, errors.New(fmt.Sprintf("selector %s is invalid", name))
+	}
+	return selectors[name], nil
 }
 
 // Node represents the service node.