You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/28 09:22:58 UTC
[incubator-inlong] 01/03: [INLONG-620]Selector for Go SDK
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 395072c337a438339bf1209228bd23e5892d394f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 15:22:15 2021 +0800
[INLONG-620]Selector for Go SDK
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/selector/ip_selector.go | 71 ++++++++++++++
.../tubemq-client-go/selector/ip_selector_test.go | 102 +++++++++++++++++++++
.../tubemq-client-go/selector/selector.go | 51 +++++++++++
3 files changed, 224 insertions(+)
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
new file mode 100644
index 0000000..a3547db
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+ "errors"
+ "strings"
+)
+
+func init() {
+ s := &ipSelector{}
+ s.indexes = make(map[string]int)
+ Register("ip", s)
+ Register("dns", s)
+}
+
+type ipSelector struct {
+ indexes map[string]int
+}
+
+// Select implements Selector interface.
+// Select will return the address in the serviceName sequentially.
+// The first address will be returned after reaching the end of the addresses.
+func (s *ipSelector) Select(serviceName string) (*Node, error) {
+ if len(serviceName) == 0 {
+ return nil, errors.New("serviceName empty")
+ }
+
+ num := strings.Count(serviceName, ",") + 1
+ if num == 1 {
+ return &Node{
+ ServiceName: serviceName,
+ Address: serviceName,
+ HasNext: false,
+ }, nil
+ }
+
+ nextIndex := 0
+ if index, ok := s.indexes[serviceName]; ok {
+ nextIndex = index
+ }
+
+ addresses := strings.Split(serviceName, ",")
+ address := addresses[nextIndex]
+ nextIndex = (nextIndex + 1) % num
+ s.indexes[serviceName] = nextIndex
+
+ node := &Node{
+ ServiceName: serviceName,
+ Address: address,
+ }
+ if nextIndex > 0 {
+ node.HasNext = true
+ }
+ return node, nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
new file mode 100644
index 0000000..6b5ba35
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSingleIP(t *testing.T) {
+ serviceName := "192.168.0.1:9092"
+ selector := Get("ip")
+ node, err := selector.Select(serviceName)
+ assert.Nil(t, err)
+ assert.Equal(t, node.HasNext, false)
+ assert.Equal(t, node.Address, "192.168.0.1:9092")
+ assert.Equal(t, node.ServiceName, "192.168.0.1:9092")
+}
+
+func TestSingleDNS(t *testing.T) {
+ serviceName := "tubemq:8081"
+ selector := Get("dns")
+ node, err := selector.Select(serviceName)
+ assert.Nil(t, err)
+ assert.Equal(t, node.HasNext, false)
+ assert.Equal(t, node.Address, "tubemq:8081")
+ assert.Equal(t, node.ServiceName, "tubemq:8081")
+}
+
+func TestMultipleIP(t *testing.T) {
+ serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
+ selector := Get("dns")
+ node, err := selector.Select(serviceName)
+ assert.Nil(t, err)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "192.168.0.1:9091", node.Address)
+ assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "192.168.0.1:9092", node.Address)
+ assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "192.168.0.1:9093", node.Address)
+ assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, false, node.HasNext)
+ assert.Equal(t, "192.168.0.1:9094", node.Address)
+ assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+}
+
+func TestMultipleDNS(t *testing.T) {
+ serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
+ selector := Get("dns")
+ node, err := selector.Select(serviceName)
+ assert.Nil(t, err)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "tubemq:8081", node.Address)
+ assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "tubemq:8082", node.Address)
+ assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, true, node.HasNext)
+ assert.Equal(t, "tubemq:8083", node.Address)
+ assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+ node, err = selector.Select(serviceName)
+ assert.Equal(t, false, node.HasNext)
+ assert.Equal(t, "tubemq:8084", node.Address)
+ assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+}
+
+
+func TestEmptyService(t *testing.T) {
+ serviceName := ""
+ selector := Get("ip")
+ _, err := selector.Select(serviceName)
+ assert.Error(t, err)
+}
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
new file mode 100644
index 0000000..5b5e0c8
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package selector defines the route selector which is responsible for service discovery.
+package selector
+
+// Selector is abstraction of route selector which can return an available address
+// from the service name.
+type Selector interface {
+ // Select will return a service node which contains an available address.
+ Select(serviceName string) (*Node, error)
+}
+
+var (
+ selectors = make(map[string]Selector)
+)
+
+// Register registers a selector.
+func Register(name string, s Selector) {
+ selectors[name] = s
+}
+
+// Get returns the corresponding selector.
+func Get(name string) Selector {
+ s := selectors[name]
+ return s
+}
+
+// Node represents the service node.
+type Node struct {
+ // ServiceName of the node.
+ ServiceName string
+ // Address of the node.
+ Address string
+ // HasNext indicates whether or not the service has next node.
+ HasNext bool
+}