You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by wi...@apache.org on 2021/07/19 03:54:55 UTC

[dubbo-go-pixiu] 02/04: [#131] add zookeeper remote client

This is an automated email from the ASF dual-hosted git repository.

williamfeng323 pushed a commit to branch feature/auto-api-config
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git

commit d6d8b3e811927ac42d9aeb306bd4c4108d86b9e6
Author: william feng <wi...@hotmail.com>
AuthorDate: Sun Jun 27 19:28:47 2021 +0800

    [#131] add zookeeper remote client
---
 pkg/remoting/zookeeper/client.go      | 170 ++++++++++++++++++++++++++++++++++
 pkg/remoting/zookeeper/client_test.go |  94 +++++++++++++++++++
 2 files changed, 264 insertions(+)

diff --git a/pkg/remoting/zookeeper/client.go b/pkg/remoting/zookeeper/client.go
new file mode 100644
index 0000000..6defccf
--- /dev/null
+++ b/pkg/remoting/zookeeper/client.go
@@ -0,0 +1,170 @@
+package zookeeper
+
+import (
+	"strings"
+	"time"
+	"sync"
+)
+
+import (
+	"github.com/pkg/errors"
+	"github.com/dubbogo/go-zookeeper/zk"
+)
+
+import (
+	"github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+// Options defines the client option.
+type Options struct {
+	zkName string
+	client *ZookeeperClient
+
+	ts *zk.TestCluster
+}
+
+// Option defines the function to load the options
+type Option func(*Options)
+
+// ZookeeperClient represents zookeeper client Configuration
+type ZookeeperClient struct {
+	name         string
+	ZkAddrs      []string
+	sync.RWMutex // for conn
+	Conn         *zk.Conn
+	Timeout      time.Duration
+	exit         chan struct{}
+	Wait         sync.WaitGroup
+
+	eventRegistry     map[string][]*chan struct{}
+	eventRegistryLock sync.RWMutex
+}
+
+func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
+	var (
+		err   error
+		event <-chan zk.Event
+		z     *ZookeeperClient
+	)
+
+	z = &ZookeeperClient{
+		name:          name,
+		ZkAddrs:       zkAddrs,
+		Timeout:       timeout,
+		exit:          make(chan struct{}),
+		eventRegistry: make(map[string][]*chan struct{}),
+	}
+	// connect to zookeeper
+	z.Conn, event, err = zk.Connect(zkAddrs, timeout)
+	if err != nil {
+		return nil, errors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
+	}
+
+	z.Wait.Add(1)
+	go z.HandleZkEvent(event)
+
+	return z, nil
+}
+
+// nolint
+func StateToString(state zk.State) string {
+	switch state {
+	case zk.StateDisconnected:
+		return "zookeeper disconnected"
+	case zk.StateConnecting:
+		return "zookeeper connecting"
+	case zk.StateAuthFailed:
+		return "zookeeper auth failed"
+	case zk.StateConnectedReadOnly:
+		return "zookeeper connect readonly"
+	case zk.StateSaslAuthenticated:
+		return "zookeeper sasl authenticated"
+	case zk.StateExpired:
+		return "zookeeper connection expired"
+	case zk.StateConnected:
+		return "zookeeper connected"
+	case zk.StateHasSession:
+		return "zookeeper has session"
+	case zk.StateUnknown:
+		return "zookeeper unknown state"
+	case zk.State(zk.EventNodeDeleted):
+		return "zookeeper node deleted"
+	case zk.State(zk.EventNodeDataChanged):
+		return "zookeeper node data changed"
+	default:
+		return state.String()
+	}
+}
+
+// HandleZkEvent handles zookeeper events
+func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
+	var (
+		state int
+		event zk.Event
+	)
+
+	defer func() {
+		z.Wait.Done()
+		logger.Infof("zk{path:%v, name:%s} connection goroutine game over.", z.ZkAddrs, z.name)
+	}()
+
+	for {
+		select {
+		case <-z.exit:
+			return
+		case event = <-session:
+			logger.Infof("client{%s} get a zookeeper event{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
+				z.name, event.Type, event.Server, event.Path, event.State, StateToString(event.State), event.Err)
+			switch (int)(event.State) {
+			case (int)(zk.StateDisconnected):
+				logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
+				z.stop()
+				z.Lock()
+				conn := z.Conn
+				z.Conn = nil
+				z.Unlock()
+				if conn != nil {
+					conn.Close()
+				}
+				return
+			case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
+				logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
+				z.eventRegistryLock.RLock()
+				for p, a := range z.eventRegistry {
+					if strings.HasPrefix(p, event.Path) {
+						logger.Infof("send event{state:zk.EventNodeDataChange, Path:%s} notify event to path{%s} related listener",
+							event.Path, p)
+						for _, e := range a {
+							*e <- struct{}{}
+						}
+					}
+				}
+				z.eventRegistryLock.RUnlock()
+			case (int)(zk.StateConnecting), (int)(zk.StateConnected), (int)(zk.StateHasSession):
+				if state == (int)(zk.StateHasSession) {
+					continue
+				}
+				z.eventRegistryLock.RLock()
+				if a, ok := z.eventRegistry[event.Path]; ok && 0 < len(a) {
+					for _, e := range a {
+						*e <- struct{}{}
+					}
+				}
+				z.eventRegistryLock.RUnlock()
+			}
+			state = (int)(event.State)
+		}
+	}
+}
+
+
+func (z *ZookeeperClient) stop() bool {
+	select {
+	case <-z.exit:
+		return true
+	default:
+		close(z.exit)
+	}
+
+	return false
+}
diff --git a/pkg/remoting/zookeeper/client_test.go b/pkg/remoting/zookeeper/client_test.go
new file mode 100644
index 0000000..94eeddd
--- /dev/null
+++ b/pkg/remoting/zookeeper/client_test.go
@@ -0,0 +1,94 @@
+package zookeeper
+
+import (
+	"github.com/apache/dubbo-go-pixiu/pkg/logger"
+	"github.com/stretchr/testify/assert"
+	"time"
+	"github.com/pkg/errors"
+	"github.com/dubbogo/go-zookeeper/zk"
+	"testing"
+)
+
+func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
+	for _, state := range expectedStates {
+		for {
+			event, ok := <-c
+			if !ok {
+				t.Fatalf("unexpected channel close for %s", source)
+			}
+			logger.Debug(event)
+			if event.Type != zk.EventSession {
+				continue
+			}
+			assert.Equal(t, event.State, state)
+			break
+		}
+	}
+}
+
+// NewMockZookeeperClient returns a mock client instance
+func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
+	var (
+		err   error
+		event <-chan zk.Event
+		z     *ZookeeperClient
+		ts    *zk.TestCluster
+	)
+
+	z = &ZookeeperClient{
+		name:          name,
+		ZkAddrs:       []string{},
+		Timeout:       timeout,
+		exit:          make(chan struct{}),
+		eventRegistry: make(map[string][]*chan struct{}),
+	}
+
+	options := &Options{}
+	for _, opt := range opts {
+		opt(options)
+	}
+
+	// connect to zookeeper
+	if options.ts != nil {
+		ts = options.ts
+	} else {
+		ts, err = zk.StartTestCluster(1, nil, nil)
+		if err != nil {
+			return nil, nil, nil, errors.WithMessagef(err, "zk.Connect")
+		}
+	}
+
+	z.Conn, event, err = ts.ConnectWithOptions(timeout)
+	if err != nil {
+		return nil, nil, nil, errors.WithMessagef(err, "zk.Connect")
+	}
+
+	return ts, z, event, nil
+}
+
+func TestNewZookeeperClient(t *testing.T) {
+	tc, err := zk.StartTestCluster(1, nil, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer tc.Stop()
+
+	callbackChan := make(chan zk.Event)
+	f := func(event zk.Event) {
+		callbackChan <- event
+	}
+
+	zook, eventChan, err := tc.ConnectWithOptions(15*time.Second, zk.WithEventCallback(f))
+	if err != nil {
+		t.Fatalf("Connect returned error: %+v", err)
+	}
+
+	states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
+	verifyEventStateOrder(t, callbackChan, states, "callback")
+	verifyEventStateOrder(t, eventChan, states, "event channel")
+
+	zook.Close()
+	verifyEventStateOrder(t, callbackChan, []zk.State{zk.StateDisconnected}, "callback")
+	verifyEventStateOrder(t, eventChan, []zk.State{zk.StateDisconnected}, "event channel")
+
+}
\ No newline at end of file