You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/28 09:22:58 UTC

[incubator-inlong] 01/03: [INLONG-620]Selector for Go SDK

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 395072c337a438339bf1209228bd23e5892d394f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 27 15:22:15 2021 +0800

    [INLONG-620]Selector for Go SDK
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/selector/ip_selector.go       |  71 ++++++++++++++
 .../tubemq-client-go/selector/ip_selector_test.go  | 102 +++++++++++++++++++++
 .../tubemq-client-go/selector/selector.go          |  51 +++++++++++
 3 files changed, 224 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
new file mode 100644
index 0000000..a3547db
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector.go
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+	"errors"
+	"strings"
+)
+
+func init() {
+	s := &ipSelector{}
+	s.indexes = make(map[string]int)
+	Register("ip", s)
+	Register("dns", s)
+}
+
+type ipSelector struct {
+	indexes map[string]int
+}
+
+// Select implements Selector interface.
+// Select will return the address in the serviceName sequentially.
+// The first address will be returned after reaching the end of the addresses.
+func (s *ipSelector) Select(serviceName string) (*Node, error) {
+	if len(serviceName) == 0 {
+		return nil, errors.New("serviceName empty")
+	}
+
+	num := strings.Count(serviceName, ",") + 1
+	if num == 1 {
+		return &Node{
+			ServiceName: serviceName,
+			Address:     serviceName,
+			HasNext:     false,
+		}, nil
+	}
+
+	nextIndex := 0
+	if index, ok := s.indexes[serviceName]; ok {
+		nextIndex = index
+	}
+
+	addresses := strings.Split(serviceName, ",")
+	address := addresses[nextIndex]
+	nextIndex = (nextIndex + 1) % num
+	s.indexes[serviceName] = nextIndex
+
+	node := &Node{
+		ServiceName: serviceName,
+		Address:     address,
+	}
+	if nextIndex > 0 {
+		node.HasNext = true
+	}
+	return node, nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
new file mode 100644
index 0000000..6b5ba35
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/ip_selector_test.go
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package selector
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestSingleIP(t *testing.T) {
+	serviceName := "192.168.0.1:9092"
+	selector := Get("ip")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, node.HasNext, false)
+	assert.Equal(t, node.Address, "192.168.0.1:9092")
+	assert.Equal(t, node.ServiceName, "192.168.0.1:9092")
+}
+
+func TestSingleDNS(t *testing.T) {
+	serviceName := "tubemq:8081"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, node.HasNext, false)
+	assert.Equal(t, node.Address, "tubemq:8081")
+	assert.Equal(t, node.ServiceName, "tubemq:8081")
+}
+
+func TestMultipleIP(t *testing.T) {
+	serviceName := "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9091", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9092", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9093", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, false, node.HasNext)
+	assert.Equal(t, "192.168.0.1:9094", node.Address)
+	assert.Equal(t, "192.168.0.1:9091,192.168.0.1:9092,192.168.0.1:9093,192.168.0.1:9094", node.ServiceName)
+}
+
+func TestMultipleDNS(t *testing.T) {
+	serviceName := "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084"
+	selector := Get("dns")
+	node, err := selector.Select(serviceName)
+	assert.Nil(t, err)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8081", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8082", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, true, node.HasNext)
+	assert.Equal(t, "tubemq:8083", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+
+	node, err = selector.Select(serviceName)
+	assert.Equal(t, false, node.HasNext)
+	assert.Equal(t, "tubemq:8084", node.Address)
+	assert.Equal(t, "tubemq:8081,tubemq:8082,tubemq:8083,tubemq:8084", node.ServiceName)
+}
+
+
+func TestEmptyService(t *testing.T) {
+	serviceName := ""
+	selector := Get("ip")
+	_, err := selector.Select(serviceName)
+	assert.Error(t, err)
+}
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-go/selector/selector.go b/tubemq-client-twins/tubemq-client-go/selector/selector.go
new file mode 100644
index 0000000..5b5e0c8
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/selector/selector.go
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package selector defines the route selector which is responsible for service discovery.
+package selector
+
+// Selector is abstraction of route selector which can return an available address
+// from the service name.
+type Selector interface {
+	// Select will return a service node which contains an available address.
+	Select(serviceName string) (*Node, error)
+}
+
+var (
+	selectors = make(map[string]Selector)
+)
+
+// Register registers a selector.
+func Register(name string, s Selector) {
+	selectors[name] = s
+}
+
+// Get returns the corresponding selector.
+func Get(name string) Selector {
+	s := selectors[name]
+	return s
+}
+
+// Node represents the service node.
+type Node struct {
+	// ServiceName of the node.
+	ServiceName string
+	// Address of the node.
+	Address string
+	// HasNext indicates whether or not the service has next node.
+	HasNext bool
+}