You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/28 09:22:57 UTC

[incubator-inlong] branch INLONG-25 updated (24b3c99 -> 6fd7644)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git.


    from 24b3c99  Fix comments
     new 395072c  [INLONG-620]Selector for Go SDK
     new 2c6f4bb  Address review comments
     new 6fd7644  Use errs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |   2 +
 .../tubemq-client-go/selector/ip_selector.go       |  83 +++++++++++++++
 .../tubemq-client-go/selector/ip_selector_test.go  | 111 +++++++++++++++++++++
 .../tubemq-client-go/selector/selector.go          |  59 +++++++++++
 4 files changed, 255 insertions(+)
 create mode 100644 tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
 create mode 100644 tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
 create mode 100644 tubemq-client-twins/tubemq-client-go/selector/selector.go

[incubator-inlong] 01/03: [INLONG-620]Selector for Go SDK

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 395072c337a438339bf1209228bd23e5892d394f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 15:22:15 2021 +0800

    [INLONG-620]Selector for Go SDK
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/selector/ip_selector.go       |  71 ++++++++++++++
 .../tubemq-client-go/selector/ip_selector_test.go  | 102 +++++++++++++++++++++
 .../tubemq-client-go/selector/selector.go          |  51 +++++++++++
 3 files changed, 224 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
new file mode 100644
index 0000000..a3547db
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+	"errors"
+	"strings"
+)
+
+func init() {
+	s := &ipSelector{}
+	s.indexes = make(map[string]int)
+	Register("ip", s)
+	Register("dns", s)
+}
+
+type ipSelector struct {
+	indexes map[string]int
+}
+
+// Select implements Selector interface.
+// Select will return the address in the serviceName sequentially.
+// The first address will be returned after reaching the end of the addresses.
+func (s *ipSelector) Select(serviceName string) (*Node, error) {
+	if len(serviceName) == 0 {
+		return nil, errors.New("serviceName empty")
+	}
+
+	num := strings.Count(serviceName, ",") + 1
+	if num == 1 {
+		return &Node{
+			ServiceName: serviceName,
+			Address:     serviceName,
+			HasNext:     false,
+		}, nil
+	}
+
+	nextIndex := 0
+	if index, ok := s.indexes[serviceName]; ok {
+		nextIndex = index
+	}
+
+	addresses := strings.Split(serviceName, ",")
+	address := addresses[nextIndex]
+	nextIndex = (nextIndex + 1) % num
+	s.indexes[serviceName] = nextIndex
+
+	node := &Node{
+		ServiceName: serviceName,
+		Address:     address,
+	}
+	if nextIndex > 0 {
+		node.HasNext = true
+	}
+	return node, nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
new file mode 100644
index 0000000..6b5ba35
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestSingleIP(t *testing.T) {
+	serviceName := "192.168.0.1:9092"
+	selector := Get("ip")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, node.HasNext, false)
+	assert.Equal(t, node.Address, "192.168.0.1:9092")
+	assert.Equal(t, node.ServiceName, "192.168.0.1:9092")
+}
+
+func TestSingleDNS(t *testing.T) {
+	serviceName := "tubemq:8081"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, node.HasNext, false)
+	assert.Equal(t, node.Address, "tubemq:8081")
+	assert.Equal(t, node.ServiceName, "tubemq:8081")
+}
+
+func TestMultipleIP(t *testing.T) {
+	serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9091", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9092", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9093", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, false, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9094", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+}
+
+func TestMultipleDNS(t *testing.T) {
+	serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8081", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8082", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8083", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, false, node.HasNext)
+	assert.Equal(t, "tubemq:8084", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+}
+
+
+func TestEmptyService(t *testing.T) {
+	serviceName := ""
+	selector := Get("ip")
+	_, err := selector.Select(serviceName)
+	assert.Error(t, err)
+}
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
new file mode 100644
index 0000000..5b5e0c8
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package selector defines the route selector which is responsible for service discovery.
+package selector
+
+// Selector is abstraction of route selector which can return an available address
+// from the service name.
+type Selector interface {
+	// Select will return a service node which contains an available address.
+	Select(serviceName string) (*Node, error)
+}
+
+var (
+	selectors = make(map[string]Selector)
+)
+
+// Register registers a selector.
+func Register(name string, s Selector) {
+	selectors[name] = s
+}
+
+// Get returns the corresponding selector.
+func Get(name string) Selector {
+	s := selectors[name]
+	return s
+}
+
+// Node represents the service node.
+type Node struct {
+	// ServiceName of the node.
+	ServiceName string
+	// Address of the node.
+	Address string
+	// HasNext indicates whether or not the service has next node.
+	HasNext bool
+}

[incubator-inlong] 03/03: Use errs

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 6fd764444c824cae9595a5551c2d7a7443a3e4e6
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri May 28 16:03:15 2021 +0800

    Use errs
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/errs/errs.go         | 2 ++
 tubemq-client-twins/tubemq-client-go/selector/selector.go | 5 +++--
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 0ef803d..00b3c81 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -33,6 +33,8 @@ const (
 	RetAssertionFailure = 4
 	// RetRequestFailure represents the error code of request error.
 	RetRequestFailure = 5
+	// RetSelectorNotExist = 6
+	RetSelectorNotExist = 6
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 77352fd..9d34be0 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -19,8 +19,9 @@
 package selector
 
 import (
-	"errors"
 	"fmt"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 )
 
 // Selector is abstraction of route selector which can return an available address
@@ -42,7 +43,7 @@ func Register(name string, s Selector) {
 // Get returns the corresponding selector.
 func Get(name string) (Selector, error) {
 	if _, ok := selectors[name]; !ok {
-		return nil, errors.New(fmt.Sprintf("selector %s is invalid", name))
+		return nil, errs.New(errs.RetSelectorNotExist, fmt.Sprintf("selector %s is invalid", name))
 	}
 	return selectors[name], nil
 }

[incubator-inlong] 02/03: Address review comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2c6f4bbca5f8fc4fa6c95f5208b87fabe1787034
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 19:25:00 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/selector/ip_selector.go       | 24 +++++++++++++++------
 .../tubemq-client-go/selector/ip_selector_test.go  | 25 +++++++++++++++-------
 .../tubemq-client-go/selector/selector.go          | 13 ++++++++---
 3 files changed, 45 insertions(+), 17 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
index a3547db..177f3ee 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -24,13 +24,18 @@ import (
 
 func init() {
 	s := &ipSelector{}
-	s.indexes = make(map[string]int)
+	s.services = make(map[string]*ipServices)
 	Register("ip", s)
 	Register("dns", s)
 }
 
 type ipSelector struct {
-	indexes map[string]int
+	services map[string]*ipServices
+}
+
+type ipServices struct {
+	nextIndex int
+	addresses []string
 }
 
 // Select implements Selector interface.
@@ -50,15 +55,22 @@ func (s *ipSelector) Select(serviceName string) (*Node, error) {
 		}, nil
 	}
 
+	var addresses []string
 	nextIndex := 0
-	if index, ok := s.indexes[serviceName]; ok {
-		nextIndex = index
+	if _, ok := s.services[serviceName]; !ok {
+		addresses = strings.Split(serviceName, ",")
+	} else {
+		services := s.services[serviceName]
+		addresses = services.addresses
+		nextIndex = services.nextIndex
 	}
 
-	addresses := strings.Split(serviceName, ",")
 	address := addresses[nextIndex]
 	nextIndex = (nextIndex + 1) % num
-	s.indexes[serviceName] = nextIndex
+	s.services[serviceName] = &ipServices{
+		addresses: addresses,
+		nextIndex: nextIndex,
+	}
 
 	node := &Node{
 		ServiceName: serviceName,
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
index 6b5ba35..8233006 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -25,7 +25,8 @@ import (
 
 func TestSingleIP(t *testing.T) {
 	serviceName := "192.168.0.1:9092"
-	selector := Get("ip")
+	selector, err := Get("ip")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, node.HasNext, false)
@@ -35,7 +36,8 @@ func TestSingleIP(t *testing.T) {
 
 func TestSingleDNS(t *testing.T) {
 	serviceName := "tubemq:8081"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, node.HasNext, false)
@@ -45,7 +47,8 @@ func TestSingleDNS(t *testing.T) {
 
 func TestMultipleIP(t *testing.T) {
 	serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, true, node.HasNext)
@@ -70,7 +73,8 @@ func TestMultipleIP(t *testing.T) {
 
 func TestMultipleDNS(t *testing.T) {
 	serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
-	selector := Get("dns")
+	selector, err := Get("dns")
+	assert.Nil(t, err)
 	node, err := selector.Select(serviceName)
 	assert.Nil(t, err)
 	assert.Equal(t, true, node.HasNext)
@@ -93,10 +97,15 @@ func TestMultipleDNS(t *testing.T) {
 	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
 }
 
-
 func TestEmptyService(t *testing.T) {
 	serviceName := ""
-	selector := Get("ip")
-	_, err := selector.Select(serviceName)
+	selector, err := Get("ip")
+	assert.Nil(t, err)
+	_, err = selector.Select(serviceName)
 	assert.Error(t, err)
-}
\ No newline at end of file
+}
+
+func TestInvalidSelector(t *testing.T) {
+	_, err := Get("selector")
+	assert.Error(t, err)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
index 5b5e0c8..77352fd 100644
--- a/tubemq-client-twins/tubemq-client-go/selector/selector.go
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -18,6 +18,11 @@
 // package selector defines the route selector which is responsible for service discovery.
 package selector
 
+import (
+	"errors"
+	"fmt"
+)
+
 // Selector is abstraction of route selector which can return an available address
 // from the service name.
 type Selector interface {
@@ -35,9 +40,11 @@ func Register(name string, s Selector) {
 }
 
 // Get returns the corresponding selector.
-func Get(name string) Selector {
-	s := selectors[name]
-	return s
+func Get(name string) (Selector, error) {
+	if _, ok := selectors[name]; !ok {
+		return nil, errors.New(fmt.Sprintf("selector %s is invalid", name))
+	}
+	return selectors[name], nil
 }
 
 // Node represents the service node.