You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by wi...@apache.org on 2021/07/19 03:54:56 UTC
[dubbo-go-pixiu] 03/04: [#131] zk registry basic
This is an automated email from the ASF dual-hosted git repository.
williamfeng323 pushed a commit to branch feature/auto-api-config
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
commit 2fa4338593d7c0b3ace16e8f06beead216193898
Author: william feng <wi...@hotmail.com>
AuthorDate: Mon Jun 28 17:01:16 2021 +0800
[#131] zk registry basic
---
.gitignore | 3 +-
go.mod | 1 +
go.sum | 8 +-
pkg/common/constant/key.go | 14 ++
pkg/common/constant/pixiu.go | 4 +
.../load.go => common/constant/remote.go} | 16 +--
pkg/common/constant/url.go | 13 --
pkg/common/extension/registry.go | 26 ++++
pkg/registry/base_registry.go | 11 ++
pkg/registry/{ => consul}/consul.go | 0
pkg/registry/{ => consul}/consul_test.go | 0
pkg/registry/listener.go | 67 ++++++++++
pkg/registry/registry.go | 64 +++++++++
pkg/registry/zookeeper/listener.go | 1 +
pkg/registry/zookeeper/loader_test.go | 2 +
pkg/registry/zookeeper/registry.go | 146 +++++++++++++++++++++
pkg/registry/zookeeper/registry_test.go | 62 +++++++++
.../contrib/fatjar/zookeeper-3.4.9-fatjar.jar | Bin 0 -> 8673355 bytes
pkg/registry/zookeeper/zookeeper.go | 97 ++++++++++++++
pkg/registry/zookeeper/zookeeper_test.go | 121 +++++++++++++++++
pkg/remoting/zookeeper/client.go | 76 ++++++++---
pkg/remoting/zookeeper/client_test.go | 73 ++++++++---
.../contrib/fatjar/zookeeper-3.4.9-fatjar.jar | Bin 0 -> 8673355 bytes
23 files changed, 746 insertions(+), 59 deletions(-)
diff --git a/.gitignore b/.gitignore
index 79c27da..af0f01e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,5 +9,4 @@ samples/dubbogo/simple/server/app/app
/.idea
/.vscode
pkg/registry/zookeeper-4unittest/contrib/fatjar
-
-
+.DS_Store
diff --git a/go.mod b/go.mod
index c9ed41e..73b6e54 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
github.com/dubbogo/gost v1.11.8
github.com/emirpasic/gods v1.12.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
+ github.com/go-zookeeper/zk v1.0.2
github.com/goinggo/mapstructure v0.0.0-20140717182941-194205d9b4a9
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/consul/api v1.5.0
diff --git a/go.sum b/go.sum
index d36cd95..0a9e47f 100644
--- a/go.sum
+++ b/go.sum
@@ -283,7 +283,13 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw=
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
-github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
+github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
+github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
+github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
+github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
+github.com/gobwas/ws v1.0.3/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
+github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
+github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
diff --git a/pkg/common/constant/key.go b/pkg/common/constant/key.go
index 95ae76c..73fcb3d 100644
--- a/pkg/common/constant/key.go
+++ b/pkg/common/constant/key.go
@@ -35,3 +35,17 @@ const (
const (
LocalMemoryApiDiscoveryService = "api.ds.local_memory"
)
+
+const (
+ ApplicationKey = "application"
+ AppVersionKey = "app.version"
+ ClusterKey = "cluster"
+ GroupKey = "group"
+ VersionKey = "version"
+ InterfaceKey = "interface"
+ MethodsKey = "methods"
+ // NameKey name of interface
+ NameKey = "name"
+ // RetriesKey retry times
+ RetriesKey = "retries"
+)
diff --git a/pkg/common/constant/pixiu.go b/pkg/common/constant/pixiu.go
index c9c6b10..3bf5aea 100644
--- a/pkg/common/constant/pixiu.go
+++ b/pkg/common/constant/pixiu.go
@@ -55,3 +55,7 @@ const (
//YML .yml
YML = ".yml"
)
+
+const (
+ StringSeparator = ","
+)
\ No newline at end of file
diff --git a/pkg/registry/load.go b/pkg/common/constant/remote.go
similarity index 68%
rename from pkg/registry/load.go
rename to pkg/common/constant/remote.go
index 8548f99..67d4468 100644
--- a/pkg/registry/load.go
+++ b/pkg/common/constant/remote.go
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-package registry
+package constant
-import "github.com/apache/dubbo-go/common"
-
-// Loader this interface defined for load services from different kinds registry, such as nacos,consul,zookeeper.
-type Loader interface {
- // LoadAllServices load all services registered in registry
- LoadAllServices() ([]*common.URL, error)
- // GetCluster get the registry name
- GetCluster() (string, error)
-}
+// env key
+const (
+ Zookeeper string = "zookeeper"
+ Consul string = "consul"
+)
diff --git a/pkg/common/constant/url.go b/pkg/common/constant/url.go
index c1ee29f..2ae3741 100644
--- a/pkg/common/constant/url.go
+++ b/pkg/common/constant/url.go
@@ -17,19 +17,6 @@
package constant
const (
- // NameKey name of interface
- NameKey = "name"
- // GroupKey group of interface
- GroupKey = "group"
- // VersionKey Version of interface
- VersionKey = "version"
- // InterfaceKey interface
- InterfaceKey = "interface"
- // RetriesKey retry times
- RetriesKey = "retries"
-)
-
-const (
// RequestBody name of api config mapping from/to
RequestBody = "requestBody"
// QueryStrings name of api config mapping from/to
diff --git a/pkg/common/extension/registry.go b/pkg/common/extension/registry.go
new file mode 100644
index 0000000..6a09a5f
--- /dev/null
+++ b/pkg/common/extension/registry.go
@@ -0,0 +1,26 @@
+package extension
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+ "github.com/apache/dubbo-go-pixiu/pkg/registry"
+)
+
+var registryMap = make(map[string]func(model.Registry) (registry.Registry, error), 8)
+
+// SetFilterFunc will store the @filter and @name
+func SetRegistry(name string, newRegFunc func(model.Registry) (registry.Registry, error)) {
+ registryMap[name] = newRegFunc
+}
+
+// GetMustFilterFunc will return the pixiu.FilterFunc
+// if not found, it will panic
+func GetRegistry(name string, regConfig model.Registry) registry.Registry {
+ if registry, ok := registryMap[name]; ok {
+ reg, err := registry(regConfig)
+ if err != nil {
+ panic("Initialize Registry" + name + "failed due to: " + err.Error())
+ }
+ return reg
+ }
+ panic("Registry " + name + " does not support yet")
+}
diff --git a/pkg/registry/base_registry.go b/pkg/registry/base_registry.go
new file mode 100644
index 0000000..420aac8
--- /dev/null
+++ b/pkg/registry/base_registry.go
@@ -0,0 +1,11 @@
+package registry
+
+type BaseRegistry struct {
+ listeners []Listener
+}
+
+func NewBaseRegistry() *BaseRegistry {
+ return &BaseRegistry{
+ listeners: []Listener{},
+ }
+}
diff --git a/pkg/registry/consul.go b/pkg/registry/consul/consul.go
similarity index 100%
rename from pkg/registry/consul.go
rename to pkg/registry/consul/consul.go
diff --git a/pkg/registry/consul_test.go b/pkg/registry/consul/consul_test.go
similarity index 100%
rename from pkg/registry/consul_test.go
rename to pkg/registry/consul/consul_test.go
diff --git a/pkg/registry/listener.go b/pkg/registry/listener.go
new file mode 100644
index 0000000..e90d7f5
--- /dev/null
+++ b/pkg/registry/listener.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package registry
+
+//////////////////////////////////////////
+// event type
+//////////////////////////////////////////
+
+// EventType means SourceObjectEventType
+type EventType int
+
+const (
+ // EventTypeAdd means add event
+ EventTypeAdd = iota
+ // EventTypeDel means del event
+ EventTypeDel
+ // EventTypeUpdate means update event
+ EventTypeUpdate
+)
+
+var serviceEventTypeStrings = [...]string{
+ "add",
+ "delete",
+ "update",
+}
+
+// nolint
+func (t EventType) String() string {
+ return serviceEventTypeStrings[t]
+}
+
+//////////////////////////////////////////
+// service event
+//////////////////////////////////////////
+
+// Event defines common elements for service event
+
+type Event struct {
+ Path string
+ Action EventType
+ Content string
+}
+
+// nolint
+func (e Event) String() string {
+ return "Event{Action{" + e.Action.String() + "}, Content{" + e.Content + "}}"
+}
+
+// Listener this interface defined for load services from different kinds registry, such as nacos,consul,zookeeper.
+type Listener interface {
+ // ChangeHandler processes the events sent from registry
+ ChangeHandler(Event)
+}
diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go
new file mode 100644
index 0000000..769e111
--- /dev/null
+++ b/pkg/registry/registry.go
@@ -0,0 +1,64 @@
+package registry
+
+import (
+ "strings"
+ "time"
+
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go/common"
+ "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
+ "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
+ "github.com/pkg/errors"
+)
+
+// Registry interface defines the basic features of a registry
+type Registry interface {
+ // LoadServices loads all the registered Dubbo services from registry
+ LoadServices()
+ // Subscribe monitors the target registry.
+ Subscribe(common.URL) error
+ // Unsubscribe stops monitoring the target registry.
+ Unsubscribe(common.URL) error
+}
+
+// CreateAPIConfig returns router.API struct base on the input
+func CreateAPIConfig(urlPattern string, dboBackendConfig config.DubboBackendConfig, methodString string, mappingParams []config.MappingParam) router.API{
+ dboBackendConfig.Method = methodString
+ url := urlPattern + "/" + methodString
+ method := config.Method{
+ OnAir: true,
+ Timeout: 3 * time.Second,
+ Mock: false,
+ Filters: []string{},
+ HTTPVerb: config.MethodPost,
+ InboundRequest: config.InboundRequest{
+ RequestType: config.HTTPRequest,
+ },
+ IntegrationRequest: config.IntegrationRequest{
+ RequestType: config.DubboRequest,
+ DubboBackendConfig: dboBackendConfig,
+ MappingParams: mappingParams,
+ },
+ }
+ return router.API{
+ URLPattern: url,
+ Method: method,
+ }
+}
+// ParseDubboString parse the dubbo urls
+// dubbo://192.168.3.46:20002/org.apache.dubbo.UserProvider2?anyhost=true&app.version=0.0.1&application=UserInfoServer&bean.name=UserProvider&cluster=failover&environment=dev&export=true&interface=org.apache.dubbo.UserProvider2&ip=192.168.3.46&loadbalance=random&message_size=4&methods=GetUser&methods.GetUser.loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbo-go user-info server&name=UserInfoServer&organization=dubbo.io&pid=11037®istry.role=3&release=dub [...]
+func ParseDubboString(urlString string) (config.DubboBackendConfig, []string, error) {
+ url, err := common.NewURL(urlString)
+ if err != nil {
+ return config.DubboBackendConfig{}, nil, errors.WithStack(err)
+ }
+ return config.DubboBackendConfig{
+ ClusterName: url.GetParam(constant.ClusterKey, ""),
+ ApplicationName: url.GetParam(constant.ApplicationKey, ""),
+ Version: url.GetParam(constant.AppVersionKey, ""),
+ Protocol: string(config.DubboRequest),
+ Group: url.GetParam(constant.GroupKey, ""),
+ Interface: url.GetParam(constant.InterfaceKey, ""),
+ Retries: url.GetParam(constant.RetriesKey, ""),
+ }, strings.Split(url.GetParam(constant.MethodsKey, ""), constant.StringSeparator), nil
+}
\ No newline at end of file
diff --git a/pkg/registry/zookeeper/listener.go b/pkg/registry/zookeeper/listener.go
new file mode 100644
index 0000000..72f5133
--- /dev/null
+++ b/pkg/registry/zookeeper/listener.go
@@ -0,0 +1 @@
+package zookeeper
diff --git a/pkg/registry/zookeeper/loader_test.go b/pkg/registry/zookeeper/loader_test.go
new file mode 100644
index 0000000..16c6648
--- /dev/null
+++ b/pkg/registry/zookeeper/loader_test.go
@@ -0,0 +1,2 @@
+package zookeeper
+
diff --git a/pkg/registry/zookeeper/registry.go b/pkg/registry/zookeeper/registry.go
new file mode 100644
index 0000000..db3d72b
--- /dev/null
+++ b/pkg/registry/zookeeper/registry.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "strings"
+ "time"
+)
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/api/config"
+ "github.com/dubbogo/dubbo-go-pixiu-filter/pkg/router"
+ "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+ "github.com/apache/dubbo-go-pixiu/pkg/model"
+ "github.com/apache/dubbo-go-pixiu/pkg/registry"
+ zk "github.com/apache/dubbo-go-pixiu/pkg/remoting/zookeeper"
+)
+
+const (
+ // RegistryZkClient zk client name
+ RegistryZkClient = "zk registry"
+ rootPath = "/dubbo"
+)
+
+func init() {
+ extension.SetRegistry(constant.Zookeeper, newZKRegistry)
+}
+
+type ZKRegistry struct {
+ *registry.BaseRegistry
+ client *zk.ZookeeperClient
+}
+
+func newZKRegistry(regConfig model.Registry) (registry.Registry, error) {
+ baseReg := registry.NewBaseRegistry()
+ timeout, err := time.ParseDuration(regConfig.Timeout)
+ if err != nil {
+ return nil, errors.Errorf("Incorrect timeout configuration: %s", regConfig.Timeout)
+ }
+ client, err := zk.NewZookeeperClient(RegistryZkClient, strings.Split(regConfig.Address, ","), timeout)
+ if err != nil {
+ return nil, errors.Errorf("Initialize zookeeper client failed: %s", err.Error())
+ }
+ return &ZKRegistry{
+ BaseRegistry: baseReg,
+ client: client,
+ }, nil
+}
+
+// LoadServices loads all the registered Dubbo services from registry
+func (r *ZKRegistry)LoadServices() {
+ r.LoadInterfaces()
+ r.LoadApplications()
+}
+
+// LoadInterfaces load services registered before dubbo 2.7
+func (r *ZKRegistry)LoadInterfaces() []error {
+ subPaths, err := r.client.GetChildren(rootPath)
+ if err != nil {
+ return []error{err}
+ }
+ if len(subPaths) == 0 {
+ return nil
+ }
+ errorStack := []error{}
+ for i := range subPaths {
+ if subPaths[i] == "metadata" {
+ continue
+ }
+ providerPath := subPaths[i] + "/providers"
+ providerString, err := r.client.GetChildren(providerPath)
+ if err != nil {
+ logger.Warnf("Get provider %s failed due to %s", providerPath, err.Error())
+ errorStack = append(errorStack, errors.WithStack(err));
+ }
+ interfaceDetailString := providerString[0]
+ bkConfig, methods, err := registry.ParseDubboString(interfaceDetailString)
+ if err != nil {
+ logger.Warnf("Parse dubbo interface provider %s failed; due to \n %s", interfaceDetailString, err.Error())
+ errorStack = append(errorStack, errors.WithStack(err));
+ }
+ localAPIDiscSrv := extension.GetMustAPIDiscoveryService(constant.LocalMemoryApiDiscoveryService)
+ apiPattern := bkConfig.ApplicationName + "/" + bkConfig.Interface + "/" + bkConfig.Version
+ mappingParams := []config.MappingParam{
+ {
+ Name: "requestBody.values",
+ MapTo: "opt.values",
+ },
+ {
+ Name: "requestBody.types",
+ MapTo: "opt.types",
+ },
+ }
+ for i := range methods {
+ err := localAPIDiscSrv.AddAPI(registry.CreateAPIConfig(apiPattern, bkConfig, methods[i], mappingParams))
+ if err != nil {
+ logger.Warnf("Add URL %s/method failed; due to \n %s", interfaceDetailString, err.Error())
+ errorStack = append(errorStack, errors.WithStack(err));
+ }
+ }
+ }
+ return errorStack
+}
+
+// LoadApplications load services registered after dubbo 2.7
+func (r *ZKRegistry)LoadApplications() {}
+
+// Subscribe monitors the target registry.
+func (r *ZKRegistry) Subscribe(common.URL) error {
+ return nil
+}
+// Unsubscribe stops monitoring the target registry.
+func (r *ZKRegistry)Unsubscribe(common.URL) error {
+ return nil
+}
+
+// CreateAPIFromRegistry creates the router from registry and save to local cache
+func CreateAPIFromRegistry(api router.API) error {
+ localAPIDiscSrv := extension.GetMustAPIDiscoveryService(constant.LocalMemoryApiDiscoveryService)
+ err := localAPIDiscSrv.AddAPI(api)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/pkg/registry/zookeeper/registry_test.go b/pkg/registry/zookeeper/registry_test.go
new file mode 100644
index 0000000..ca16124
--- /dev/null
+++ b/pkg/registry/zookeeper/registry_test.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zookeeper
+
+import (
+ "fmt"
+ "net/url"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestNewZKRegistry(t *testing.T) {
+ regConfig := model.Registry{
+ Protocol: "zookeeper",
+ Timeout: "2s",
+ Address: "127.0.0.1:9100",
+ }
+ reg, err := newZKRegistry(regConfig)
+ assert.Nil(t, err)
+ assert.NotNil(t, reg)
+
+ regConfig = model.Registry{
+ Protocol: "zookeeper",
+ Timeout: "2xxxxxx",
+ Address: "127.0.0.1:9100",
+ }
+ reg, err = newZKRegistry(regConfig)
+ assert.Nil(t, reg)
+ assert.NotNil(t, err)
+
+ regConfig = model.Registry{
+ Protocol: "zookeeper",
+ Timeout: "2s",
+ Address: "",
+ }
+ reg, err = newZKRegistry(regConfig)
+ assert.Nil(t, reg)
+ assert.NotNil(t, err)
+}
+
+
+const providerUrlstr = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?methods.GetUser.retries=1"
+
+func TestLoadInterfaces(t *testing.T) {
+ reg
+}
\ No newline at end of file
diff --git a/pkg/registry/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar b/pkg/registry/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
new file mode 100644
index 0000000..839531b
Binary files /dev/null and b/pkg/registry/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar differ
diff --git a/pkg/registry/zookeeper/zookeeper.go b/pkg/registry/zookeeper/zookeeper.go
new file mode 100644
index 0000000..8452464
--- /dev/null
+++ b/pkg/registry/zookeeper/zookeeper.go
@@ -0,0 +1,97 @@
+// /*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+package zookeeper
+
+// import (
+// "path"
+// "strings"
+// "time"
+// )
+
+// import (
+// "github.com/apache/dubbo-go/common"
+// "github.com/apache/dubbo-go/remoting/zookeeper"
+// )
+
+// import (
+// "github.com/apache/dubbo-go-pixiu/pkg/logger"
+// "github.com/apache/dubbo-go-pixiu/pkg/registry"
+// )
+
+// const (
+// rootPath = "/dubbo"
+// )
+
+// func init() {
+// var _ registry.Loader = new(ZookeeperRegistryLoad)
+// }
+
+// // ZookeeperRegistryLoad load dubbo apis from zookeeper registry
+// type ZookeeperRegistryLoad struct {
+// zkName string
+// client *zookeeper.ZookeeperClient
+// Address string
+// cluster string
+// }
+
+// func newZookeeperRegistryLoad(address, cluster string) (registry.Loader, error) {
+// newClient, err := zookeeper.NewZookeeperClient("zkClient", strings.Split(address, ","), 15*time.Second)
+// if err != nil {
+// logger.Warnf("newZookeeperClient error:%v", err)
+// return nil, err
+// }
+
+// r := &ZookeeperRegistryLoad{
+// Address: address,
+// client: newClient,
+// cluster: cluster,
+// }
+
+// return r, nil
+// }
+
+// // nolint
+// func (crl *ZookeeperRegistryLoad) GetCluster() (string, error) {
+// return crl.cluster, nil
+// }
+
+// // LoadAllServices load all services from zookeeper registry
+// func (crl *ZookeeperRegistryLoad) LoadAllServices() ([]*common.URL, error) {
+// children, err := crl.client.GetChildren(rootPath)
+// if err != nil {
+// logger.Errorf("[zookeeper registry] get zk children error:%v", err)
+// return nil, err
+// }
+// var urls []*common.URL
+// for _, _interface := range children {
+// providerStr := path.Join(rootPath, "/", _interface, "/", "providers")
+// urlStrs, err := crl.client.GetChildren(providerStr)
+// if err != nil {
+// logger.Errorf("[zookeeper registry] get zk children \"%s\" error:%v", providerStr, err)
+// return nil, err
+// }
+// for _, url := range urlStrs {
+// dubboURL, err := common.NewURL(url)
+// if err != nil {
+// logger.Warnf("[zookeeper registry] transfer zk info to url error:%v", err)
+// continue
+// }
+// urls = append(urls, dubboURL)
+// }
+// }
+// return urls, nil
+// }
diff --git a/pkg/registry/zookeeper/zookeeper_test.go b/pkg/registry/zookeeper/zookeeper_test.go
new file mode 100644
index 0000000..782a573
--- /dev/null
+++ b/pkg/registry/zookeeper/zookeeper_test.go
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zookeeper
+
+// import (
+// "fmt"
+// "net/url"
+// "strconv"
+// "strings"
+// "testing"
+// "time"
+// )
+
+// import (
+// "github.com/apache/dubbo-go/common"
+// "github.com/apache/dubbo-go/common/constant"
+// "github.com/apache/dubbo-go/remoting/zookeeper"
+// "github.com/dubbogo/go-zookeeper/zk"
+// "github.com/stretchr/testify/assert"
+// )
+
+// const providerUrlstr = "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?methods.GetUser.retries=1"
+
+// func newMockZkRegistry(t *testing.T, providerURL *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, error) {
+// var (
+// err error
+// c *zk.TestCluster
+// client *zookeeper.ZookeeperClient
+// )
+// c, client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
+// if err != nil {
+// return nil, err
+// }
+// var (
+// params url.Values
+// rawURL string
+// encodedURL string
+// dubboPath string
+// )
+// params = url.Values{}
+
+// providerURL.RangeParams(func(key, value string) bool {
+// params.Add(key, value)
+// return true
+// })
+
+// dubboPath = fmt.Sprintf("/dubbo/%s/%s", url.QueryEscape(providerURL.Service()), common.DubboNodes[common.PROVIDER])
+// err = client.CreateWithValue(dubboPath, []byte(""))
+// assert.Nil(t, err)
+
+// if len(providerURL.Methods) > 0 {
+// params.Add(constant.METHODS_KEY, strings.Join(providerURL.Methods, ","))
+// }
+// var host string
+// if providerURL.Ip == "" {
+// host = ""
+// } else {
+// host = providerURL.Ip
+// }
+// host += ":" + providerURL.Port
+// rawURL = fmt.Sprintf("%s://%s%s?%s", providerURL.Protocol, host, providerURL.Path, params.Encode())
+
+// encodedURL = url.QueryEscape(rawURL)
+// dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
+
+// err = client.Create(dubboPath)
+// assert.Nil(t, err)
+
+// // to register the node
+// _, err = client.RegisterTemp(dubboPath, encodedURL)
+// assert.Nil(t, err)
+// return c, nil
+// }
+
+// func registerProvider(providerURL *common.URL, t *testing.T) (*zk.TestCluster, error) {
+// return newMockZkRegistry(t, providerURL)
+// }
+
+// func TestZookeeperRegistryLoad_GetCluster(t *testing.T) {
+// providerURL, _ := common.NewURL(providerUrlstr)
+// testCluster, err := registerProvider(providerURL, t)
+// assert.Nil(t, err)
+// defer testCluster.Stop()
+// registryLoad, err := newZookeeperRegistryLoad("127.0.0.1:1111", "test-cluster")
+// assert.Nil(t, err)
+// assert.NotNil(t, registryLoad)
+// clusterName, err := registryLoad.GetCluster()
+// assert.Nil(t, err)
+// assert.Equal(t, "test-cluster", clusterName)
+// }
+
+// func TestZookeeperRegistryLoad_LoadAllServices(t *testing.T) {
+// providerURL, _ := common.NewURL(providerUrlstr)
+// testCluster, err := registerProvider(providerURL, t)
+// assert.Nil(t, err)
+// defer testCluster.Stop()
+// registryLoad, err := newZookeeperRegistryLoad(fmt.Sprintf("%s:%s", "127.0.0.1", strconv.Itoa(testCluster.Servers[0].Port)), "test-cluster")
+// assert.Nil(t, err)
+// assert.NotNil(t, registryLoad)
+// services, err := registryLoad.LoadAllServices()
+// assert.Nil(t, err)
+// assert.GreaterOrEqual(t, len(services), 1)
+// assert.Equal(t, services[0].Protocol, "dubbo")
+// assert.Equal(t, services[0].Location, "127.0.0.1:20000")
+// assert.Equal(t, services[0].Path, "/com.ikurento.user.UserProvider")
+// assert.Equal(t, services[0].GetMethodParam("GetUser", "retries", ""), "1")
+// }
diff --git a/pkg/remoting/zookeeper/client.go b/pkg/remoting/zookeeper/client.go
index 6defccf..29854b4 100644
--- a/pkg/remoting/zookeeper/client.go
+++ b/pkg/remoting/zookeeper/client.go
@@ -18,20 +18,26 @@ import (
// Options defines the client option.
type Options struct {
zkName string
- client *ZookeeperClient
-
+ client *ZooKeeperClient
ts *zk.TestCluster
}
// Option defines the function to load the options
type Option func(*Options)
-// ZookeeperClient represents zookeeper client Configuration
-type ZookeeperClient struct {
+// WithZkName sets zk client name
+func WithZkName(name string) Option {
+ return func(opt *Options) {
+ opt.zkName = name
+ }
+}
+
+// ZooKeeperClient represents zookeeper client Configuration
+type ZooKeeperClient struct {
name string
ZkAddrs []string
sync.RWMutex // for conn
- Conn *zk.Conn
+ conn *zk.Conn
Timeout time.Duration
exit chan struct{}
Wait sync.WaitGroup
@@ -40,14 +46,14 @@ type ZookeeperClient struct {
eventRegistryLock sync.RWMutex
}
-func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZookeeperClient, error) {
+func NewZooKeeperClient(name string, zkAddrs []string, timeout time.Duration) (*ZooKeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
- z *ZookeeperClient
+ z *ZooKeeperClient
)
- z = &ZookeeperClient{
+ z = &ZooKeeperClient{
name: name,
ZkAddrs: zkAddrs,
Timeout: timeout,
@@ -55,15 +61,12 @@ func NewZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
eventRegistry: make(map[string][]*chan struct{}),
}
// connect to zookeeper
- z.Conn, event, err = zk.Connect(zkAddrs, timeout)
+ z.conn, event, err = zk.Connect(zkAddrs, timeout)
if err != nil {
- return nil, errors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
+ return nil, nil, errors.WithMessagef(err, "zk.Connect(zkAddrs:%+v)", zkAddrs)
}
- z.Wait.Add(1)
- go z.HandleZkEvent(event)
-
- return z, nil
+ return z, event, nil
}
// nolint
@@ -96,8 +99,13 @@ func StateToString(state zk.State) string {
}
}
+func (z *ZooKeeperClient) RegisterHandler(event <-chan zk.Event) {
+ z.Wait.Add(1)
+ go z.HandleZkEvent(event)
+}
+
// HandleZkEvent handles zookeeper events
-func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
+func (z *ZooKeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
event zk.Event
@@ -120,8 +128,8 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
- conn := z.Conn
- z.Conn = nil
+ conn := z.conn
+ z.conn = nil
z.Unlock()
if conn != nil {
conn.Close()
@@ -157,8 +165,14 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
}
}
+// getConn gets zookeeper connection safely
+func (z *ZooKeeperClient) getConn() *zk.Conn {
+ z.RLock()
+ defer z.RUnlock()
+ return z.conn
+}
-func (z *ZookeeperClient) stop() bool {
+func (z *ZooKeeperClient) stop() bool {
select {
case <-z.exit:
return true
@@ -168,3 +182,29 @@ func (z *ZookeeperClient) stop() bool {
return false
}
+
+
+// GetChildren gets children by @path
+func (z *ZooKeeperClient) GetChildren(path string) ([]string, error) {
+ var (
+ children []string
+ stat *zk.Stat
+ )
+ conn := z.getConn()
+ if conn == nil {
+ return nil, errors.New("ZooKeeper client has no connection")
+ }
+ children, stat, err := conn.Children(path)
+ if err != nil {
+ if err == zk.ErrNoNode {
+ return nil, errors.Errorf("path{%s} does not exist", path)
+ }
+ logger.Errorf("zk.Children(path{%s}) = error(%v)", path, errors.WithStack(err))
+ return nil, errors.WithMessagef(err, "zk.Children(path:%s)", path)
+ }
+ if stat.NumChildren == 0 {
+ return nil, errors.Errorf("path{%s} has none children", path)
+ }
+
+ return children, nil
+}
diff --git a/pkg/remoting/zookeeper/client_test.go b/pkg/remoting/zookeeper/client_test.go
index 94eeddd..38dd7b0 100644
--- a/pkg/remoting/zookeeper/client_test.go
+++ b/pkg/remoting/zookeeper/client_test.go
@@ -1,6 +1,7 @@
package zookeeper
import (
+ "fmt"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/stretchr/testify/assert"
"time"
@@ -26,16 +27,16 @@ func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.
}
}
-// NewMockZookeeperClient returns a mock client instance
-func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
+// NewMockZooKeeperClient returns a mock client instance
+func NewMockZooKeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZooKeeperClient, <-chan zk.Event, error) {
var (
err error
event <-chan zk.Event
- z *ZookeeperClient
+ z *ZooKeeperClient
ts *zk.TestCluster
)
- z = &ZookeeperClient{
+ z = &ZooKeeperClient{
name: name,
ZkAddrs: []string{},
Timeout: timeout,
@@ -58,7 +59,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
}
}
- z.Conn, event, err = ts.ConnectWithOptions(timeout)
+ z.conn, event, err = ts.ConnectWithOptions(timeout)
if err != nil {
return nil, nil, nil, errors.WithMessagef(err, "zk.Connect")
}
@@ -66,29 +67,71 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
return ts, z, event, nil
}
-func TestNewZookeeperClient(t *testing.T) {
+func TestNewZooKeeperClient(t *testing.T) {
tc, err := zk.StartTestCluster(1, nil, nil)
if err != nil {
t.Fatal(err)
}
defer tc.Stop()
- callbackChan := make(chan zk.Event)
- f := func(event zk.Event) {
- callbackChan <- event
+ hosts := make([]string, len(tc.Servers))
+ for i, srv := range tc.Servers {
+ hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}
-
- zook, eventChan, err := tc.ConnectWithOptions(15*time.Second, zk.WithEventCallback(f))
+ zkClient, eventChan, err := NewZooKeeperClient("testZK", hosts, 30 * time.Second)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
- verifyEventStateOrder(t, callbackChan, states, "callback")
verifyEventStateOrder(t, eventChan, states, "event channel")
- zook.Close()
- verifyEventStateOrder(t, callbackChan, []zk.State{zk.StateDisconnected}, "callback")
+ zkClient.getConn().Close()
verifyEventStateOrder(t, eventChan, []zk.State{zk.StateDisconnected}, "event channel")
+}
+
+func TestGetChildren(t *testing.T) {
+ tc, err := zk.StartTestCluster(1, nil, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer tc.Stop()
-}
\ No newline at end of file
+ conn, _, err := tc.ConnectAll()
+ assert.Nil(t, err)
+ path, err := conn.Create("/test", nil, 0, zk.WorldACL(zk.PermAll))
+ assert.Nil(t, err)
+ assert.NotNil(t, path)
+ path, err = conn.Create("/test/testchild1", nil, 0, zk.WorldACL(zk.PermAll))
+ assert.Nil(t, err)
+ assert.NotNil(t, path)
+ conn.Create("/test/testchild2", nil, 0, zk.WorldACL(zk.PermAll))
+
+ hosts := make([]string, len(tc.Servers))
+ for i, srv := range tc.Servers {
+ hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
+ }
+ zkClient, eventChan, err := NewZooKeeperClient("testZK", hosts, 30 * time.Second)
+ assert.Nil(t, err)
+ wait:
+ for {
+ event := <- eventChan
+ switch event.State {
+ case zk.StateDisconnected:
+ break wait
+ case zk.StateConnected:
+ children, err := zkClient.GetChildren("/test")
+ assert.Nil(t, err)
+ assert.Equal(t, children[1], "testchild1")
+ assert.Equal(t, children[0], "testchild2")
+
+ children, err = zkClient.GetChildren("/vacancy")
+ assert.EqualError(t, err, "path{/vacancy} does not exist")
+ assert.Nil(t, children)
+ children, err = zkClient.GetChildren("/test/testchild1")
+ assert.EqualError(t, err, "path{/test/testchild1} has none children")
+ assert.Empty(t, children)
+ zkClient.conn.Close()
+ }
+ }
+}
diff --git a/pkg/remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar b/pkg/remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar
new file mode 100644
index 0000000..839531b
Binary files /dev/null and b/pkg/remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar differ