You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/11/12 08:44:52 UTC
[dubbo-go] branch 3.0 updated: Fix: some bugs and features for 3.0
(#1586)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 1fdd429 Fix: some bugs and features for 3.0 (#1586)
1fdd429 is described below
commit 1fdd429b8c4a02c477ceca510cce779f782fe512
Author: Laurence <45...@users.noreply.github.com>
AuthorDate: Fri Nov 12 16:44:44 2021 +0800
Fix: some bugs and features for 3.0 (#1586)
* fix: some bugs
* Fix: remove file\k8s registry, k8s remote
---
common/proxy/proxy.go | 31 +-
config_center/nacos/listener.go | 2 +-
go.mod | 7 +-
go.sum | 18 +-
protocol/dubbo3/dubbo3_invoker.go | 19 +-
protocol/dubbo3/dubbo3_protocol.go | 8 +-
registry/base_registry.go | 6 +-
registry/file/listener.go | 29 --
registry/file/service_discovery.go | 267 ------------
registry/file/service_discovery_test.go | 91 -----
registry/kubernetes/listener.go | 127 ------
registry/kubernetes/listener_test.go | 53 ---
registry/kubernetes/registry.go | 241 -----------
registry/kubernetes/registry_test.go | 344 ----------------
remoting/kubernetes/client.go | 213 ----------
remoting/kubernetes/client_test.go | 455 ---------------------
remoting/kubernetes/facade.go | 28 --
remoting/kubernetes/facade_test.go | 78 ----
remoting/kubernetes/listener.go | 212 ----------
remoting/kubernetes/listener_test.go | 103 -----
remoting/kubernetes/registry_controller.go | 633 -----------------------------
remoting/kubernetes/watch.go | 320 ---------------
remoting/kubernetes/watch_test.go | 96 -----
remoting/zookeeper/listener.go | 2 +-
24 files changed, 53 insertions(+), 3330 deletions(-)
diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index e1c6222..c32eedc 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -19,6 +19,7 @@ package proxy
import (
"context"
+ "errors"
"reflect"
"sync"
)
@@ -116,13 +117,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
valueOf := reflect.ValueOf(v)
valueOfElem := valueOf.Elem()
- typeOf := valueOfElem.Type()
-
- // check incoming interface, incoming interface's elem must be a struct.
- if typeOf.Kind() != reflect.Struct {
- logger.Errorf("The type of RPCService(=\"%T\") must be a pointer of a struct.", v)
- return
- }
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
@@ -227,6 +221,18 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
}
}
+ if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil {
+ logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err)
+ return
+ }
+}
+
+func refectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value) error {
+ typeOf := valueOfElem.Type()
+ // check incoming interface, incoming interface's elem must be a struct.
+ if typeOf.Kind() != reflect.Struct {
+ return errors.New("invalid type kind")
+ }
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
@@ -258,6 +264,17 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
// do method proxy here:
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
+ } else if f.IsValid() && f.CanSet() {
+ // for struct combination
+ valueOfSub := reflect.New(t.Type)
+ valueOfElemInterface := valueOfSub.Elem()
+ if valueOfElemInterface.Type().Kind() == reflect.Struct {
+ if err := refectAndMakeObjectFunc(valueOfElemInterface, makeDubboCallProxy); err != nil {
+ return err
+ }
+ f.Set(valueOfElemInterface)
+ }
}
}
+ return nil
}
diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go
index 3d60d2a..a4cf589 100644
--- a/config_center/nacos/listener.go
+++ b/config_center/nacos/listener.go
@@ -22,6 +22,7 @@ import (
)
import (
+ constant2 "github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
@@ -30,7 +31,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
- constant2 "github.com/nacos-group/nacos-sdk-go/common/constant"
)
func callback(listener config_center.ConfigurationListener, _, _, dataId, data string) {
diff --git a/go.mod b/go.mod
index 30d3bb8..3256db0 100644
--- a/go.mod
+++ b/go.mod
@@ -13,8 +13,8 @@ require (
github.com/creasty/defaults v1.5.2
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.19
- github.com/dubbogo/grpc-go v1.42.4-triple
- github.com/dubbogo/triple v1.1.2
+ github.com/dubbogo/grpc-go v1.42.5-triple
+ github.com/dubbogo/triple v1.1.3
github.com/emicklei/go-restful/v3 v3.7.1
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
@@ -32,8 +32,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v1.0.9
github.com/natefinch/lumberjack v2.0.0+incompatible
- github.com/onsi/ginkgo v1.10.1 // indirect
- github.com/onsi/gomega v1.7.0 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
@@ -49,7 +47,6 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0
- k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
k8s.io/client-go v0.16.9
)
diff --git a/go.sum b/go.sum
index 2fbdc68..4ab084c 100644
--- a/go.sum
+++ b/go.sum
@@ -177,13 +177,13 @@ github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZT
github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A=
github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
-github.com/dubbogo/grpc-go v1.42.4-triple h1:ysiabUrEGcaeXgnjSBT0bB1M7EexSJFiO0Mebg/Iqa4=
-github.com/dubbogo/grpc-go v1.42.4-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
+github.com/dubbogo/grpc-go v1.42.5-triple h1:Ed5z/ikkpdZHBMA4mTEthQFTQeKlHtkdAsQrZjTbFk8=
+github.com/dubbogo/grpc-go v1.42.5-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.4/go.mod h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc=
github.com/dubbogo/triple v1.0.9/go.mod h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw=
-github.com/dubbogo/triple v1.1.2 h1:7lmQ0uNvcIYlMj5gNwPQadFx8w8UDEtcYl4DL6X+idM=
-github.com/dubbogo/triple v1.1.2/go.mod h1:x+H41M5yP1ULnJu4b+o8VrgsIKdTPslTum2yUqA9N1I=
+github.com/dubbogo/triple v1.1.3 h1:XKSh42lE2HLud++g4Fif7XY2hSMEsohFpegZPvsNXVQ=
+github.com/dubbogo/triple v1.1.3/go.mod h1:suMeAfZliq0p/lWIytgEdiuKcRlmeJC9pYeNHVE7FWU=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -208,7 +208,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.0/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
-github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.5.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
@@ -358,7 +357,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
-github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -440,7 +438,6 @@ github.com/hashicorp/vault/sdk v0.3.0 h1:kR3dpxNkhh/wr6ycaJYqp6AFT/i2xaftbfnwZdu
github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
-github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@@ -585,13 +582,9 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
-github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
-github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
-github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
@@ -1200,7 +1193,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
-gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
@@ -1212,7 +1204,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
-gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
@@ -1248,7 +1239,6 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
-k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ=
k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1 h1:+ySTxfHnfzZb9ys375PXNlLhkJPLKgHajBU0N62BDvE=
k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index c978f85..da42a32 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -27,6 +27,8 @@ import (
)
import (
+ "github.com/dubbogo/grpc-go/metadata"
+
tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
triConfig "github.com/dubbogo/triple/pkg/config"
"github.com/dubbogo/triple/pkg/triple"
@@ -134,7 +136,19 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
}
// append interface id to ctx
- ctx = context.WithValue(ctx, tripleConstant.CtxAttachmentKey, invocation.Attachments())
+ gRPCMD := make(metadata.MD, 0)
+ for k, v := range invocation.Attachments() {
+ if str, ok := v.(string); ok {
+ gRPCMD.Set(k, str)
+ continue
+ }
+ if str, ok := v.([]string); ok {
+ gRPCMD.Set(k, str...)
+ continue
+ }
+ logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+ }
+ ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
in := make([]reflect.Value, 0, 16)
in = append(in, reflect.ValueOf(ctx))
@@ -146,8 +160,9 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
methodName := invocation.MethodName()
triAttachmentWithErr := di.client.Invoke(methodName, in, invocation.Reply())
result.Err = triAttachmentWithErr.GetError()
+ result.Attrs = make(map[string]interface{})
for k, v := range triAttachmentWithErr.GetAttachments() {
- result.Attachment(k, v)
+ result.Attrs[k] = v
}
result.Rest = invocation.Reply()
return &result
diff --git a/protocol/dubbo3/dubbo3_protocol.go b/protocol/dubbo3/dubbo3_protocol.go
index 3e097c7..2ad3dff 100644
--- a/protocol/dubbo3/dubbo3_protocol.go
+++ b/protocol/dubbo3/dubbo3_protocol.go
@@ -190,13 +190,7 @@ func (d *UnaryService) GetReqParamsInterfaces(methodName string) ([]interface{},
}
func (d *UnaryService) InvokeWithArgs(ctx context.Context, methodName string, arguments []interface{}) (interface{}, error) {
- dubboAttachment := make(map[string]interface{})
- tripleAttachment, ok := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.TripleAttachment)
- if ok {
- for k, v := range tripleAttachment {
- dubboAttachment[k] = v
- }
- }
+ dubboAttachment, _ := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.DubboAttachment)
res := d.proxyImpl.Invoke(ctx, invocation.NewRPCInvocation(methodName, arguments, dubboAttachment))
return res, res.Error()
}
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 7ac9f2b..c6151e8 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -358,7 +358,7 @@ func (r *BaseRegistry) consumerRegistry(c *common.URL, params url.Values, f crea
rawURL string
err error
)
- dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
+ dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.CONSUMER])
if f != nil {
err = f(dubboPath)
@@ -412,7 +412,7 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
listener.Close()
break
} else {
- logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
+ logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
@@ -443,7 +443,7 @@ func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListene
listener.Close()
break
} else {
- logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
+ logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
}
diff --git a/registry/file/listener.go b/registry/file/listener.go
deleted file mode 100644
index 55f35ea..0000000
--- a/registry/file/listener.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-import (
- "dubbo.apache.org/dubbo-go/v3/config_center"
-)
-
-// RegistryConfigurationListener represent the processor of flie watcher
-type RegistryConfigurationListener struct{}
-
-// Process submit the ConfigChangeEvent to the event chan to notify all observer
-func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
-}
diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go
deleted file mode 100644
index 65f2c24..0000000
--- a/registry/file/service_discovery.go
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "path"
- "strconv"
-)
-
-import (
- gxset "github.com/dubbogo/gost/container/set"
- gxpage "github.com/dubbogo/gost/hash/page"
-
- perrors "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/config_center"
- "dubbo.apache.org/dubbo-go/v3/config_center/file"
- "dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// init will put the service discovery into extension
-func init() {
- extension.SetServiceDiscovery(constant.FileKey, newFileSystemServiceDiscovery)
-}
-
-// fileServiceDiscovery is the implementation of service discovery based on file.
-type fileSystemServiceDiscovery struct {
- dynamicConfiguration file.FileSystemDynamicConfiguration
- rootPath string
- fileMap map[string]string
-}
-
-func newFileSystemServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
- if url.Protocol != constant.FileKey {
- return nil, perrors.New("could not init the instance because the config is invalid")
- }
-
- rp, err := file.Home()
- if err != nil {
- return nil, perrors.WithStack(err)
- }
- fdcf := extension.GetConfigCenterFactory(constant.FileKey)
- p := path.Join(rp, ".dubbo", constant.RegistryKey)
- url.AddParamAvoidNil(file.ConfigCenterDirParamName, p)
- c, err := fdcf.GetDynamicConfiguration(url)
- if err != nil {
- return nil, perrors.WithStack(err)
- }
-
- sd := &fileSystemServiceDiscovery{
- dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
- rootPath: p,
- fileMap: make(map[string]string),
- }
-
- extension.AddCustomShutdownCallback(func() {
- if err := sd.Destroy(); err != nil {
- logger.Warnf("sd.Destroy() = error:%v", err)
- }
- })
-
- for _, v := range sd.GetServices().Values() {
- for _, i := range sd.GetInstances(v.(string)) {
- // like java do nothing
- l := &RegistryConfigurationListener{}
- sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
- }
- }
-
- return sd, nil
-}
-
-// nolint
-func (fssd *fileSystemServiceDiscovery) String() string {
- return fmt.Sprintf("file-system-service-discovery")
-}
-
-// Destroy will destroy the service discovery.
-// If the discovery cannot be destroy, it will return an error.
-func (fssd *fileSystemServiceDiscovery) Destroy() error {
- fssd.dynamicConfiguration.Close()
-
- for _, f := range fssd.fileMap {
- fssd.releaseAndRemoveRegistrationFiles(f)
- }
-
- return nil
-}
-
-// nolint
-func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) {
- os.RemoveAll(file)
-}
-
-// ----------------- registration ----------------
-
-// Register will register an instance of ServiceInstance to registry
-func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error {
- id := getServiceInstanceId(instance)
- sn := getServiceName(instance)
-
- c, err := toJsonString(instance)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- err = fssd.dynamicConfiguration.PublishConfig(id, sn, c)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
-
- return nil
-}
-
-// nolint
-func getServiceInstanceId(si registry.ServiceInstance) string {
- if si.GetID() == "" {
- return si.GetHost() + "." + strconv.Itoa(si.GetPort())
- }
-
- return si.GetID()
-}
-
-// nolint
-func getServiceName(si registry.ServiceInstance) string {
- return si.GetServiceName()
-}
-
-// toJsonString to json string
-func toJsonString(si registry.ServiceInstance) (string, error) {
- bytes, err := json.Marshal(si)
- if err != nil {
- return "", perrors.WithStack(err)
- }
-
- return string(bytes), nil
-}
-
-// Update will update the data of the instance in registry
-func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error {
- return fssd.Register(instance)
-}
-
-// Unregister will unregister this instance from registry
-func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
- id := getServiceInstanceId(instance)
- sn := getServiceName(instance)
-
- err := fssd.dynamicConfiguration.RemoveConfig(id, sn)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- delete(fssd.fileMap, instance.GetID())
- return nil
-}
-
-// ----------------- discovery -------------------
-// GetDefaultPageSize will return the default page size
-func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int {
- return 100
-}
-
-// GetServices will return the all service names.
-func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet {
- r := gxset.NewSet()
- // dynamicConfiguration root path is the actual root path
- fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath())
-
- for _, file := range fileInfo {
- if file.IsDir() {
- r.Add(file.Name())
- }
- }
-
- return r
-}
-
-// GetInstances will return all service instances with serviceName
-func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
- set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName)
- if err != nil {
- logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
- serviceName, err)
- return make([]registry.ServiceInstance, 0)
- }
-
- res := make([]registry.ServiceInstance, 0, set.Size())
- for _, v := range set.Values() {
- id := v.(string)
- p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName))
- if err != nil {
- logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
- "error = err{%v} ",
- id, serviceName, err)
- return make([]registry.ServiceInstance, 0)
- }
-
- dsi := ®istry.DefaultServiceInstance{}
- err = json.Unmarshal([]byte(p), dsi)
- if err != nil {
- logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
- "error = err{%v} ",
- id, serviceName, err)
- return make([]registry.ServiceInstance, 0)
- }
-
- res = append(res, dsi)
- }
-
- return res
-}
-
-// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName
-// the page will start at offset
-func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
- return nil
-}
-
-// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
-// The param healthy indices that the instance should be healthy or not.
-// The page will start at offset
-func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int,
- healthy bool) gxpage.Pager {
- return nil
-}
-
-// Batch get all instances by the specified service names
-func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int,
- requestedSize int) map[string]gxpage.Pager {
- return nil
-}
-
-// ----------------- event ----------------------
-// AddListener adds a new ServiceInstancesChangedListenerImpl
-// client
-func (fssd *fileSystemServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
- // fssd.dynamicConfiguration.AddListener(listener.ServiceName)
- return nil
-}
diff --git a/registry/file/service_discovery_test.go b/registry/file/service_discovery_test.go
deleted file mode 100644
index 0152fc6..0000000
--- a/registry/file/service_discovery_test.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-//
-//import (
-// "math/rand"
-// "strconv"
-// "testing"
-// "time"
-//)
-//
-//import (
-// "github.com/stretchr/testify/assert"
-//)
-//
-//import (
-// "dubbo.apache.org/dubbo-go/v3/common/constant"
-// "dubbo.apache.org/dubbo-go/v3/common/extension"
-// "dubbo.apache.org/dubbo-go/v3/registry"
-//)
-//
-//func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) {
-// prepareData()
-// serviceDiscovery, err := newFileSystemServiceDiscovery()
-// assert.NoError(t, err)
-// assert.NotNil(t, serviceDiscovery)
-// defer func() {
-// err = serviceDiscovery.Destroy()
-// assert.Nil(t, err)
-// }()
-//}
-//
-//func TestCURDFileSystemServiceDiscovery(t *testing.T) {
-// prepareData()
-// serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY)
-// assert.NoError(t, err)
-// md := make(map[string]string)
-//
-// rand.Seed(time.Now().Unix())
-// serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
-// md["t1"] = "test1"
-// r1 := ®istry.DefaultServiceInstance{
-// ID: "123456789",
-// ServiceName: serviceName,
-// Host: "127.0.0.1",
-// Port: 2233,
-// Enable: true,
-// Healthy: true,
-// Metadata: md,
-// }
-// err = serviceDiscovery.Register(r1)
-// assert.NoError(t, err)
-//
-// instances := serviceDiscovery.GetInstances(r1.ServiceName)
-// assert.Equal(t, 1, len(instances))
-// assert.Equal(t, r1.ID, instances[0].GetID())
-// assert.Equal(t, r1.ServiceName, instances[0].GetServiceName())
-// assert.Equal(t, r1.Port, instances[0].GetPort())
-//
-// err = serviceDiscovery.Unregister(r1)
-// assert.NoError(t, err)
-//
-// err = serviceDiscovery.Register(r1)
-// assert.NoError(t, err)
-// defer func() {
-// err = serviceDiscovery.Destroy()
-// assert.NoError(t, err)
-// }()
-//}
-//
-//func prepareData() {
-// //config.GetRootConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
-// // Protocol: "file",
-// //}
-//}
diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go
deleted file mode 100644
index 7605235..0000000
--- a/registry/kubernetes/listener.go
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "strings"
-)
-
-import (
- gxchan "github.com/dubbogo/gost/container/chan"
-
- perrors "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/config_center"
- "dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-type dataListener struct {
- interestedURL []*common.URL
- listener config_center.ConfigurationListener
-}
-
-// NewRegistryDataListener creates a data listener for kubernetes
-func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
- return &dataListener{listener: listener}
-}
-
-// AddInterestedURL adds the @url of registry center to the listener
-func (l *dataListener) AddInterestedURL(url *common.URL) {
- l.interestedURL = append(l.interestedURL, url)
-}
-
-// DataChange
-// notify listen, when interest event
-func (l *dataListener) DataChange(eventType remoting.Event) bool {
- index := strings.Index(eventType.Path, "/providers/")
- if index == -1 {
- logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
- return false
- }
- url := eventType.Path[index+len("/providers/"):]
- serviceURL, err := common.NewURL(url)
- if err != nil {
- logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
- return false
- }
-
- for _, v := range l.interestedURL {
- if serviceURL.URLEqual(v) {
- l.listener.Process(
- &config_center.ConfigChangeEvent{
- Key: eventType.Path,
- Value: serviceURL,
- ConfigType: eventType.Action,
- },
- )
- return true
- }
- }
- return false
-}
-
-type configurationListener struct {
- registry *kubernetesRegistry
- events *gxchan.UnboundedChan
-}
-
-// NewConfigurationListener for listening the event of kubernetes.
-func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
- // add a new waiter
- reg.WaitGroup().Add(1)
- return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
-}
-
-// Process processes the data change event from config center of kubernetes
-func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
- l.events.In() <- configType
-}
-
-// Next returns next service event once received
-func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
- for {
- select {
- case <-l.registry.Done():
- logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
- return nil, perrors.New("listener stopped")
-
- case val := <-l.events.Out():
- e, _ := val.(*config_center.ConfigChangeEvent)
- logger.Debugf("got kubernetes event %#v", e)
- if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
- select {
- case <-l.registry.Done():
- logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
- default:
- }
- continue
- }
- return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
- }
- }
-}
-
-// Close kubernetes registry center
-func (l *configurationListener) Close() {
- l.registry.WaitGroup().Done()
-}
diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go
deleted file mode 100644
index a4913f5..0000000
--- a/registry/kubernetes/listener_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/config_center"
- "dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-func Test_DataChange(t *testing.T) {
- listener := NewRegistryDataListener(&MockDataListener{})
- url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bs [...]
- listener.AddInterestedURL(url)
- int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.ret [...]
- assert.Equal(t, true, int)
-}
-
-type MockDataListener struct{}
-
-func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {}
-
-func TestDataChange(t *testing.T) {
- listener := NewRegistryDataListener(&MockDataListener{})
- url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bs [...]
- listener.AddInterestedURL(url)
- if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retrie [...]
- t.Fatal("data change not ok")
- }
-}
diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go
deleted file mode 100644
index 63af25f..0000000
--- a/registry/kubernetes/registry.go
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "fmt"
- "path"
- "sync"
- "time"
-)
-
-import (
- gxtime "github.com/dubbogo/gost/time"
-
- perrors "github.com/pkg/errors"
-
- v1 "k8s.io/api/core/v1"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/registry"
- "dubbo.apache.org/dubbo-go/v3/remoting/kubernetes"
-)
-
-const (
- Name = "kubernetes"
- ConnDelay = 3
- MaxFailTimes = 15
-)
-
-func init() {
- // processID = fmt.Sprintf("%d", os.Getpid())
- // localIP = common.GetLocalIp()
- extension.SetRegistry(Name, newKubernetesRegistry)
-}
-
-type kubernetesRegistry struct {
- registry.BaseRegistry
- cltLock sync.RWMutex
- client *kubernetes.Client
- listenerLock sync.Mutex
- listener *kubernetes.EventListener
- dataListener *dataListener
- configListener *configurationListener
-}
-
-// Client gets the etcdv3 kubernetes
-func (r *kubernetesRegistry) Client() *kubernetes.Client {
- r.cltLock.RLock()
- client := r.client
- r.cltLock.RUnlock()
- return client
-}
-
-// SetClient sets the kubernetes client
-func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
- r.cltLock.Lock()
- r.client = client
- r.cltLock.Unlock()
-}
-
-// CloseAndNilClient closes listeners and clear client
-func (r *kubernetesRegistry) CloseAndNilClient() {
- r.client.Close()
- r.client = nil
-}
-
-// CloseListener closes listeners
-func (r *kubernetesRegistry) CloseListener() {
- r.cltLock.Lock()
- l := r.configListener
- r.cltLock.Unlock()
- if l != nil {
- l.Close()
- }
- r.configListener = nil
-}
-
-// CreatePath create the path in the registry center of kubernetes
-func (r *kubernetesRegistry) CreatePath(k string) error {
- if err := r.client.Create(k, ""); err != nil {
- return perrors.WithMessagef(err, "create path %s in kubernetes", k)
- }
- return nil
-}
-
-// DoRegister actually do the register job in the registry center of kubernetes
-func (r *kubernetesRegistry) DoRegister(root string, node string) error {
- return r.client.Create(path.Join(root, node), "")
-}
-
-func (r *kubernetesRegistry) DoUnregister(root string, node string) error {
- return perrors.New("DoUnregister is not support in kubernetesRegistry")
-}
-
-// DoSubscribe actually subscribe the provider URL
-func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
- var configListener *configurationListener
-
- r.listenerLock.Lock()
- configListener = r.configListener
- r.listenerLock.Unlock()
- if r.listener == nil {
- r.cltLock.Lock()
- client := r.client
- r.cltLock.Unlock()
- if client == nil {
- return nil, perrors.New("kubernetes client broken")
- }
-
- r.listenerLock.Lock()
- if r.listener == nil {
- // double check
- r.listener = kubernetes.NewEventListener(r.client)
- }
- r.listenerLock.Unlock()
- }
-
- // register the svc to dataListener
- r.dataListener.AddInterestedURL(svc)
- go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()), r.dataListener)
-
- return configListener, nil
-}
-
-// nolint
-func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
- return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry")
-}
-
-// InitListeners init listeners of kubernetes registry center
-func (r *kubernetesRegistry) InitListeners() {
- r.listener = kubernetes.NewEventListener(r.client)
- r.configListener = NewConfigurationListener(r)
- r.dataListener = NewRegistryDataListener(r.configListener)
-}
-
-func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
- // actually, kubernetes use in-cluster config,
- r := &kubernetesRegistry{}
-
- r.InitBaseRegistry(url, r)
-
- if err := kubernetes.ValidateClient(r); err != nil {
- return nil, perrors.WithStack(err)
- }
-
- go r.HandleClientRestart()
- r.InitListeners()
-
- logger.Debugf("kubernetes registry started")
-
- return r, nil
-}
-
-func newMockKubernetesRegistry(
- url *common.URL,
- podsList *v1.PodList,
-) (registry.Registry, error) {
-
- var err error
-
- r := &kubernetesRegistry{}
-
- r.InitBaseRegistry(url, r)
- r.client, err = kubernetes.NewMockClient(podsList)
- if err != nil {
- return nil, perrors.WithMessage(err, "new mock client")
- }
- r.InitListeners()
- return r, nil
-}
-
-// HandleClientRestart will reconnect to kubernetes registry center
-func (r *kubernetesRegistry) HandleClientRestart() {
- r.WaitGroup().Add(1)
- defer r.WaitGroup().Done()
- var (
- err error
- failTimes int
- )
-LOOP:
- for {
- select {
- case <-r.Done():
- logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
- break LOOP
- // re-register all services
- case <-r.Client().Done():
- r.Client().Close()
- r.SetClient(nil)
-
- // try to connect to kubernetes,
- failTimes = 0
- for {
- after := gxtime.After(timeSecondDuration(failTimes * ConnDelay))
- select {
- case <-r.Done():
- logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
- break LOOP
- case <-after: // avoid connect frequent
- }
- err = kubernetes.ValidateClient(r)
- logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
-
- if err == nil {
- if r.RestartCallBack() {
- break
- }
- }
- failTimes++
- if MaxFailTimes <= failTimes {
- failTimes = MaxFailTimes
- }
- }
- }
- }
-}
-
-func timeSecondDuration(sec int) time.Duration {
- return time.Duration(sec) * time.Second
-}
diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go
deleted file mode 100644
index 491ff94..0000000
--- a/registry/kubernetes/registry_test.go
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "encoding/json"
- "os"
- "strconv"
- "testing"
- "time"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-
- v1 "k8s.io/api/core/v1"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
-)
-
-var clientPodListJsonData = `{
- "apiVersion": "v1",
- "items": [
- {
- "apiVersion": "v1",
- "kind": "Pod",
- "metadata": {
- "annotations": {
- "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5s [...]
- },
- "creationTimestamp": "2020-06-03T03:49:14Z",
- "generateName": "server-84c864f5bc-",
- "labels": {
- "dubbo.io/label": "dubbo.io-value",
- "pod-template-hash": "84c864f5bc",
- "role": "server"
- },
- "name": "server-84c864f5bc-r8qvz",
- "namespace": "default",
- "ownerReferences": [
- {
- "apiVersion": "apps/v1",
- "blockOwnerDeletion": true,
- "controller": true,
- "kind": "ReplicaSet",
- "name": "server-84c864f5bc",
- "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
- }
- ],
- "resourceVersion": "517460",
- "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
- "uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
- },
- "spec": {
- "containers": [
- {
- "env": [
- {
- "name": "DUBBO_NAMESPACE",
- "value": "default"
- },
- {
- "name": "NAMESPACE",
- "valueFrom": {
- "fieldRef": {
- "apiVersion": "v1",
- "fieldPath": "metadata.namespace"
- }
- }
- }
- ],
- "image": "192.168.240.101:5000/scott/go-server",
- "imagePullPolicy": "Always",
- "name": "server",
- "resources": {},
- "terminationMessagePath": "/dev/termination-log",
- "terminationMessagePolicy": "File",
- "volumeMounts": [
- {
- "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
- "name": "dubbo-sa-token-5qbtb",
- "readOnly": true
- }
- ]
- }
- ],
- "dnsPolicy": "ClusterFirst",
- "enableServiceLinks": true,
- "nodeName": "minikube",
- "priority": 0,
- "restartPolicy": "Always",
- "schedulerName": "default-scheduler",
- "securityContext": {},
- "serviceAccount": "dubbo-sa",
- "serviceAccountName": "dubbo-sa",
- "terminationGracePeriodSeconds": 30,
- "tolerations": [
- {
- "effect": "NoExecute",
- "key": "node.kubernetes.io/not-ready",
- "operator": "Exists",
- "tolerationSeconds": 300
- },
- {
- "effect": "NoExecute",
- "key": "node.kubernetes.io/unreachable",
- "operator": "Exists",
- "tolerationSeconds": 300
- }
- ],
- "volumes": [
- {
- "name": "dubbo-sa-token-5qbtb",
- "secret": {
- "defaultMode": 420,
- "secretName": "dubbo-sa-token-5qbtb"
- }
- }
- ]
- },
- "status": {
- "conditions": [
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:14Z",
- "status": "True",
- "type": "Initialized"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:15Z",
- "status": "True",
- "type": "Ready"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:15Z",
- "status": "True",
- "type": "ContainersReady"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:14Z",
- "status": "True",
- "type": "PodScheduled"
- }
- ],
- "containerStatuses": [
- {
- "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
- "image": "192.168.240.101:5000/scott/go-server:latest",
- "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
- "lastState": {},
- "name": "server",
- "ready": true,
- "restartCount": 0,
- "started": true,
- "state": {
- "running": {
- "startedAt": "2020-06-03T03:49:15Z"
- }
- }
- }
- ],
- "hostIP": "10.0.2.15",
- "phase": "Running",
- "podIP": "172.17.0.6",
- "podIPs": [
- {
- "ip": "172.17.0.6"
- }
- ],
- "qosClass": "BestEffort",
- "startTime": "2020-06-03T03:49:14Z"
- }
- }
- ],
- "kind": "List",
- "metadata": {
- "resourceVersion": "",
- "selfLink": ""
- }
-}
-`
-
-func getTestRegistry(t *testing.T) *kubernetesRegistry {
- const (
- podNameKey = "HOSTNAME"
- nameSpaceKey = "NAMESPACE"
- needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
- )
- pl := &v1.PodList{}
- // 1. install test data
- if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
- t.Fatal(err)
- }
- currentPod := pl.Items[0]
-
- env := map[string]string{
- nameSpaceKey: currentPod.GetNamespace(),
- podNameKey: currentPod.GetName(),
- needWatchedNameSpaceKey: "default",
- }
-
- for k, v := range env {
- if err := os.Setenv(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER)))
- if err != nil {
- t.Fatal(err)
- }
- out, err := newMockKubernetesRegistry(regurl, pl)
- if err != nil {
- t.Fatal(err)
- }
-
- return out.(*kubernetesRegistry)
-}
-
-func TestRegister(t *testing.T) {
- r := getTestRegistry(t)
- defer r.Destroy()
-
- url, _ := common.NewURL(
- "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
- common.WithParamsValue(constant.ClusterKey, "mock"),
- common.WithMethods([]string{"GetUser", "AddUser"}),
- )
-
- err := r.Register(url)
- assert.NoError(t, err)
- _, _, err = r.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
- if err != nil {
- t.Fatal(err)
- }
-}
-
-//
-//func TestSubscribe(t *testing.T) {
-// r := getTestRegistry(t)
-// defer r.Destroy()
-//
-// url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.ClusterKey, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// listener, err := r.DoSubscribe(url)
-// if err != nil {
-// t.Fatal(err)
-// }
-//
-// wg := sync.WaitGroup{}
-// wg.Add(1)
-// go func() {
-// defer wg.Done()
-// registerErr := r.Register(url)
-// if registerErr != nil {
-// t.Error(registerErr)
-// }
-// }()
-//
-// wg.Wait()
-//
-// serviceEvent, err := listener.Next()
-// if err != nil {
-// t.Fatal(err)
-// }
-// t.Logf("get service event %s", serviceEvent)
-//}
-
-func TestConsumerDestroy(t *testing.T) {
- r := getTestRegistry(t)
-
- url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
- common.WithParamsValue(constant.ClusterKey, "mock"),
- common.WithMethods([]string{"GetUser", "AddUser"}))
-
- _, err := r.DoSubscribe(url)
- if err != nil {
- t.Fatal(err)
- }
-
- // listener.Close()
- time.Sleep(1e9)
- r.Destroy()
-
- assert.Equal(t, false, r.IsAvailable())
-}
-
-func TestProviderDestroy(t *testing.T) {
- r := getTestRegistry(t)
-
- url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
- common.WithParamsValue(constant.ClusterKey, "mock"),
- common.WithMethods([]string{"GetUser", "AddUser"}))
- err := r.Register(url)
- assert.NoError(t, err)
-
- time.Sleep(1e9)
- r.Destroy()
- assert.Equal(t, false, r.IsAvailable())
-}
-
-func TestNewRegistry(t *testing.T) {
- regUrl, err := common.NewURL("registry://127.0.0.1:443",
- common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER)))
- if err != nil {
- t.Fatal(err)
- }
- _, err = newKubernetesRegistry(regUrl)
- if err == nil {
- t.Fatal("not in cluster, should be a err")
- }
-}
-
-func TestHandleClientRestart(t *testing.T) {
- r := getTestRegistry(t)
- r.WaitGroup().Add(1)
- go r.HandleClientRestart()
- time.Sleep(timeSecondDuration(1))
- r.client.Close()
-}
diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go
deleted file mode 100644
index fe161bf..0000000
--- a/remoting/kubernetes/client.go
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "context"
- "strconv"
- "sync"
-)
-
-import (
- perrors "github.com/pkg/errors"
-
- v1 "k8s.io/api/core/v1"
-
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/fake"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
-)
-
-type Client struct {
- lock sync.RWMutex
-
- // manage the client lifecycle
- ctx context.Context
- cancel context.CancelFunc
-
- controller *dubboRegistryController
-}
-
-// NewClient returns Client instance for registry
-func NewClient(url *common.URL) (*Client, error) {
- // read type
- r, err := strconv.Atoi(url.GetParams().Get(constant.RegistryRoleKey))
- if err != nil {
- return nil, perrors.WithMessage(err, "atoi role")
- }
- ctx, cancel := context.WithCancel(context.Background())
-
- controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient)
- if err != nil {
- cancel()
- return nil, perrors.WithMessage(err, "new dubbo-registry controller")
- }
-
- c := &Client{
- ctx: ctx,
- cancel: cancel,
- controller: controller,
- }
-
- if r == common.CONSUMER {
- // only consumer have to start informer factory
- c.controller.startALLInformers()
- }
- return c, nil
-}
-
-func (c *Client) SetLabel(k, v string) error {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- if err := c.controller.assembleLabel(k, v); err != nil {
- return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
- }
-
- logger.Debugf("put the @key = %s @value = %s success", k, v)
- return nil
-}
-
-// Create creates k/v pair in watcher-set
-func (c *Client) Create(k, v string) error {
- // the read current pod must be lock, protect every
- // create operation can be atomic
- c.lock.Lock()
- defer c.lock.Unlock()
-
- if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil {
- return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
- }
-
- logger.Debugf("put the @key = %s @value = %s success", k, v)
- return nil
-}
-
-// GetChildren gets k children list from kubernetes-watcherSet
-func (c *Client) GetChildren(k string) ([]string, []string, error) {
- objectList, err := c.controller.watcherSet.Get(k, true)
- if err != nil {
- return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
- }
-
- var kList []string
- var vList []string
-
- for _, o := range objectList {
- kList = append(kList, o.Key)
- vList = append(vList, o.Value)
- }
-
- return kList, vList, nil
-}
-
-// Watch watches on spec key
-func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
- w, err := c.controller.watcherSet.Watch(k, false)
- if err != nil {
- return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
- }
-
- return w.ResultChan(), w.done(), nil
-}
-
-// WatchWithPrefix watches on spec prefix
-func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
- w, err := c.controller.watcherSet.Watch(prefix, true)
- if err != nil {
- return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
- }
-
- return w.ResultChan(), w.done(), nil
-}
-
-// if returns false, the client is die
-func (c *Client) Valid() bool {
- select {
- case <-c.Done():
- return false
- default:
- }
- c.lock.RLock()
- defer c.lock.RUnlock()
- return c.controller != nil
-}
-
-// nolint
-func (c *Client) Done() <-chan struct{} {
- return c.ctx.Done()
-}
-
-// nolint
-func (c *Client) Close() {
- select {
- case <-c.ctx.Done():
- // already stopped
- return
- default:
- }
- c.cancel()
-
- // the client ctx be canceled
- // will trigger the watcherSet watchers all stopped
- // so, just wait
-}
-
-// ValidateClient validates the kubernetes client
-func ValidateClient(container clientFacade) error {
- client := container.Client()
-
- // new Client
- if client == nil || client.Valid() {
-
- newClient, err := NewClient(container.GetURL())
- if err != nil {
- logger.Warnf("new kubernetes client: %v)", err)
- return perrors.WithMessage(err, "new kubernetes client")
- }
- container.SetClient(newClient)
- }
-
- return nil
-}
-
-// NewMockClient exports for registry package test
-func NewMockClient(podList *v1.PodList) (*Client, error) {
- ctx, cancel := context.WithCancel(context.Background())
- controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) {
- return fake.NewSimpleClientset(podList), nil
- })
- if err != nil {
- cancel()
- return nil, perrors.WithMessage(err, "new dubbo-registry controller")
- }
-
- c := &Client{
- ctx: ctx,
- cancel: cancel,
- controller: controller,
- }
-
- c.controller.startALLInformers()
- return c, nil
-}
diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go
deleted file mode 100644
index be72561..0000000
--- a/remoting/kubernetes/client_test.go
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "encoding/json"
- "fmt"
- _ "net/http/pprof"
- "os"
- "strings"
- "sync"
- "testing"
-)
-
-import (
- v1 "k8s.io/api/core/v1"
-)
-
-// tests dataset
-var tests = []struct {
- input struct {
- k string
- v string
- }
-}{
- {input: struct {
- k string
- v string
- }{k: "name", v: "scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "namePrefix", v: "prefix.scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "namePrefix1", v: "prefix1.scott.wang"}},
- {input: struct {
- k string
- v string
- }{k: "age", v: "27"}},
-}
-
-// test dataset prefix
-const prefix = "name"
-
-var watcherStopLog = "the watcherSet watcher was stopped"
-
-var clientPodListJsonData = `{
- "apiVersion": "v1",
- "items": [
- {
- "apiVersion": "v1",
- "kind": "Pod",
- "metadata": {
- "annotations": {
- "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5s [...]
- },
- "creationTimestamp": "2020-06-03T03:49:14Z",
- "generateName": "server-84c864f5bc-",
- "labels": {
- "dubbo.io/label": "dubbo.io-value",
- "pod-template-hash": "84c864f5bc",
- "role": "server"
- },
- "name": "server-84c864f5bc-r8qvz",
- "namespace": "default",
- "ownerReferences": [
- {
- "apiVersion": "apps/v1",
- "blockOwnerDeletion": true,
- "controller": true,
- "kind": "ReplicaSet",
- "name": "server-84c864f5bc",
- "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
- }
- ],
- "resourceVersion": "517460",
- "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
- "uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
- },
- "spec": {
- "containers": [
- {
- "env": [
- {
- "name": "DUBBO_NAMESPACE",
- "value": "default"
- },
- {
- "name": "NAMESPACE",
- "valueFrom": {
- "fieldRef": {
- "apiVersion": "v1",
- "fieldPath": "metadata.namespace"
- }
- }
- }
- ],
- "image": "192.168.240.101:5000/scott/go-server",
- "imagePullPolicy": "Always",
- "name": "server",
- "resources": {},
- "terminationMessagePath": "/dev/termination-log",
- "terminationMessagePolicy": "File",
- "volumeMounts": [
- {
- "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
- "name": "dubbo-sa-token-5qbtb",
- "readOnly": true
- }
- ]
- }
- ],
- "dnsPolicy": "ClusterFirst",
- "enableServiceLinks": true,
- "nodeName": "minikube",
- "priority": 0,
- "restartPolicy": "Always",
- "schedulerName": "default-scheduler",
- "securityContext": {},
- "serviceAccount": "dubbo-sa",
- "serviceAccountName": "dubbo-sa",
- "terminationGracePeriodSeconds": 30,
- "tolerations": [
- {
- "effect": "NoExecute",
- "key": "node.kubernetes.io/not-ready",
- "operator": "Exists",
- "tolerationSeconds": 300
- },
- {
- "effect": "NoExecute",
- "key": "node.kubernetes.io/unreachable",
- "operator": "Exists",
- "tolerationSeconds": 300
- }
- ],
- "volumes": [
- {
- "name": "dubbo-sa-token-5qbtb",
- "secret": {
- "defaultMode": 420,
- "secretName": "dubbo-sa-token-5qbtb"
- }
- }
- ]
- },
- "status": {
- "conditions": [
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:14Z",
- "status": "True",
- "type": "Initialized"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:15Z",
- "status": "True",
- "type": "Ready"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:15Z",
- "status": "True",
- "type": "ContainersReady"
- },
- {
- "lastProbeTime": null,
- "lastTransitionTime": "2020-06-03T03:49:14Z",
- "status": "True",
- "type": "PodScheduled"
- }
- ],
- "containerStatuses": [
- {
- "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
- "image": "192.168.240.101:5000/scott/go-server:latest",
- "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
- "lastState": {},
- "name": "server",
- "ready": true,
- "restartCount": 0,
- "started": true,
- "state": {
- "running": {
- "startedAt": "2020-06-03T03:49:15Z"
- }
- }
- }
- ],
- "hostIP": "10.0.2.15",
- "phase": "Running",
- "podIP": "172.17.0.6",
- "podIPs": [
- {
- "ip": "172.17.0.6"
- }
- ],
- "qosClass": "BestEffort",
- "startTime": "2020-06-03T03:49:14Z"
- }
- }
- ],
- "kind": "List",
- "metadata": {
- "resourceVersion": "",
- "selfLink": ""
- }
-}
-`
-
-func getTestClient(t *testing.T) *Client {
- pl := &v1.PodList{}
- // 1. install test data
- if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
- t.Fatal(err)
- }
- currentPod := pl.Items[0]
-
- env := map[string]string{
- nameSpaceKey: currentPod.GetNamespace(),
- podNameKey: currentPod.GetName(),
- needWatchedNameSpaceKey: "default",
- }
-
- for k, v := range env {
- if err := os.Setenv(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- client, err := NewMockClient(pl)
- if err != nil {
- t.Fatal(err)
- }
-
- return client
-}
-
-func TestClientValid(t *testing.T) {
- client := getTestClient(t)
- defer client.Close()
-
- if !client.Valid() {
- t.Fatal("client is not valid")
- }
-
- client.Close()
- if client.Valid() {
- t.Fatal("client is valid")
- }
-}
-
-func TestClientDone(t *testing.T) {
- client := getTestClient(t)
-
- go func() {
- client.Close()
- }()
-
- <-client.Done()
-
- if client.Valid() {
- t.Fatal("client should be invalid")
- }
-}
-
-func TestClientCreateKV(t *testing.T) {
- client := getTestClient(t)
- defer client.Close()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if err := client.Create(k, v); err != nil {
- t.Fatal(err)
- }
-
- }
-}
-
-func TestClientGetChildrenKVList(t *testing.T) {
- client := getTestClient(t)
- defer client.Close()
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- syncDataComplete := make(chan struct{})
-
- go func() {
- wc, done, err := client.WatchWithPrefix(prefix)
- if err != nil {
- t.Error(err)
- return
- }
-
- wg.Done()
- i := 0
-
- for {
- select {
- case e := <-wc:
- i++
- fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value)
- if i == 3 {
- // already sync all event
- syncDataComplete <- struct{}{}
- return
- }
- case <-done:
- t.Log(watcherStopLog)
- return
- }
- }
- }()
-
- // wait the watch goroutine start
- wg.Wait()
-
- expect := make(map[string]string)
- got := make(map[string]string)
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if strings.Contains(k, prefix) {
- expect[k] = v
- }
-
- if err := client.Create(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- <-syncDataComplete
-
- // start get all children
- kList, vList, err := client.GetChildren(prefix)
- if err != nil {
- t.Error(err)
- }
-
- for i := 0; i < len(kList); i++ {
- got[kList[i]] = vList[i]
- }
-
- for expectK, expectV := range expect {
- if got[expectK] != expectV {
- t.Fatalf("expect {%s: %s} but got {%s: %v}", expectK, expectV, expectK, got[expectK])
- }
- }
-}
-
-func TestClientWatchPrefix(t *testing.T) {
- client := getTestClient(t)
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- go func() {
- wc, done, err := client.WatchWithPrefix(prefix)
- if err != nil {
- t.Error(err)
- }
-
- wg.Done()
-
- for {
- select {
- case e := <-wc:
- t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
- case <-done:
- t.Log(watcherStopLog)
- return
- }
- }
- }()
-
- // must wait the watch goroutine work
- wg.Wait()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if err := client.Create(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- client.Close()
-}
-
-func TestClientWatch(t *testing.T) {
- client := getTestClient(t)
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- go func() {
- wc, done, err := client.Watch(prefix)
- if err != nil {
- t.Error(err)
- }
- wg.Done()
-
- for {
- select {
- case e := <-wc:
- t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
- case <-done:
- t.Log(watcherStopLog)
- return
- }
- }
- }()
-
- // must wait the watch goroutine already start the watch goroutine
- wg.Wait()
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
-
- if err := client.Create(k, v); err != nil {
- t.Fatal(err)
- }
- }
-
- client.Close()
-}
diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go
deleted file mode 100644
index 63115b0..0000000
--- a/remoting/kubernetes/facade.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
-)
-
-type clientFacade interface {
- Client() *Client
- SetClient(*Client)
- common.Node
-}
diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go
deleted file mode 100644
index 72ddb06..0000000
--- a/remoting/kubernetes/facade_test.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "strconv"
- "testing"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
-)
-
-type mockFacade struct {
- *common.URL
- client *Client
- // cltLock sync.Mutex
- // done chan struct{}
-}
-
-func (r *mockFacade) Client() *Client {
- return r.client
-}
-
-func (r *mockFacade) SetClient(client *Client) {
- r.client = client
-}
-
-func (r *mockFacade) GetURL() *common.URL {
- return r.URL
-}
-
-func (r *mockFacade) Destroy() {
- // TODO implementation me
-}
-
-func (r *mockFacade) RestartCallBack() bool {
- return true
-}
-
-func (r *mockFacade) IsAvailable() bool {
- return true
-}
-
-func Test_Facade(t *testing.T) {
- regUrl, err := common.NewURL("registry://127.0.0.1:443",
- common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.CONSUMER)))
- if err != nil {
- t.Fatal(err)
- }
-
- mockClient := getTestClient(t)
- m := &mockFacade{
- URL: regUrl,
- client: mockClient,
- }
-
- if err := ValidateClient(m); err == nil {
- t.Fatal("out of cluster should err")
- }
- mockClient.Close()
-}
diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go
deleted file mode 100644
index 9a8e0dc..0000000
--- a/remoting/kubernetes/listener.go
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "sync"
-)
-
-import (
- perrors "github.com/pkg/errors"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-type EventListener struct {
- client *Client
- keyMapLock sync.RWMutex
- keyMap map[string]struct{}
- wg sync.WaitGroup
-}
-
-func NewEventListener(client *Client) *EventListener {
- return &EventListener{
- client: client,
- keyMap: make(map[string]struct{}, 8),
- }
-}
-
-// Listen on a spec key
-// this method returns true when spec key deleted,
-// this method returns false when deep layer connection lose
-func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
- defer l.wg.Done()
- for {
- wc, done, err := l.client.Watch(key)
- if err != nil {
- logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
- return false
- }
-
- select {
-
- // client stopped
- case <-l.client.Done():
- logger.Warnf("kubernetes client stopped")
- return false
-
- // watcherSet watcher stopped
- case <-done:
- logger.Warnf("kubernetes watcherSet watcher stopped")
- return false
-
- // handle kubernetes-watcherSet events
- case e, ok := <-wc:
- if !ok {
- logger.Warnf("kubernetes-watcherSet watch-chan closed")
- return false
- }
-
- if l.handleEvents(e, listener...) {
- // if event is delete
- return true
- }
- }
- }
-}
-
-// return true means the event type is DELETE
-// return false means the event type is CREATE || UPDATE
-func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
- logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
-
- switch event.EventType {
- case Create:
- for _, listener := range listeners {
- logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
- listener.DataChange(remoting.Event{
- Path: string(event.Key),
- Action: remoting.EventTypeAdd,
- Content: string(event.Value),
- })
- }
- return false
- case Update:
- for _, listener := range listeners {
- logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
- listener.DataChange(remoting.Event{
- Path: string(event.Key),
- Action: remoting.EventTypeUpdate,
- Content: string(event.Value),
- })
- }
- return false
- case Delete:
- logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key)
- return true
- default:
- return false
- }
-}
-
-// Listen on a set of key with spec prefix
-func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
- defer l.wg.Done()
- for {
- wc, done, err := l.client.WatchWithPrefix(prefix)
- if err != nil {
- logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
- }
-
- select {
- // client stopped
- case <-l.client.Done():
- logger.Warnf("kubernetes client stopped")
- return
-
- // watcher stopped
- case <-done:
- logger.Warnf("kubernetes watcherSet watcher stopped")
- return
-
- // kuberentes-watcherSet event stream
- case e, ok := <-wc:
-
- if !ok {
- logger.Warnf("kubernetes-watcherSet watch-chan closed")
- return
- }
-
- l.handleEvents(e, listener...)
- }
- }
-}
-
-// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener
-// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
-// |
-// --------> ListenServiceNodeEvent
-func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
- l.keyMapLock.RLock()
- _, ok := l.keyMap[key]
- l.keyMapLock.RUnlock()
- if ok {
- logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key)
- return
- }
-
- l.keyMapLock.Lock()
- // double check
- if _, ok := l.keyMap[key]; ok {
- // another goroutine already set it
- l.keyMapLock.Unlock()
- return
- }
- l.keyMap[key] = struct{}{}
- l.keyMapLock.Unlock()
-
- keyList, valueList, err := l.client.GetChildren(key)
- if err != nil {
- logger.Warnf("Get new node path {%v} 's content error,message is {%v}", key, perrors.WithMessage(err, "get children"))
- }
-
- logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
-
- for i, k := range keyList {
- logger.Infof("got children list key -> %s", k)
- listener.DataChange(remoting.Event{
- Path: k,
- Action: remoting.EventTypeAdd,
- Content: valueList[i],
- })
- }
-
- logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key)
-
- l.wg.Add(1)
- go func(key string, listener remoting.DataListener) {
- l.ListenServiceNodeEventWithPrefix(key, listener)
- logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
- }(key, listener)
-
- logger.Infof("listen dubbo service key{%s}", key)
- l.wg.Add(1)
- go func(key string) {
- if l.ListenServiceNodeEvent(key) {
- listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
- }
- logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key)
- }(key)
-}
-
-func (l *EventListener) Close() {
- l.wg.Wait()
-}
diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go
deleted file mode 100644
index ffe58cb..0000000
--- a/remoting/kubernetes/listener_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "testing"
- "time"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-var changedData = `
- dubbo.consumer.request_timeout=3s
- dubbo.consumer.connect_timeout=5s
- dubbo.application.organization=ikurento.com
- dubbo.application.name=BDTService
- dubbo.application.module=dubbogo user-info server
- dubbo.application.version=0.0.1
- dubbo.application.owner=ZX
- dubbo.application.environment=dev
- dubbo.registries.hangzhouzk.protocol=zookeeper
- dubbo.registries.hangzhouzk.timeout=3s
- dubbo.registries.hangzhouzk.address=127.0.0.1:2181
- dubbo.registries.shanghaizk.protocol=zookeeper
- dubbo.registries.shanghaizk.timeout=3s
- dubbo.registries.shanghaizk.address=127.0.0.1:2182
- dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
- dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
- dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
- dubbo.service.com.ikurento.user.UserProvider.warmup=100
- dubbo.service.com.ikurento.user.UserProvider.cluster=failover
-`
-
-type mockDataListener struct {
- eventList []remoting.Event
- client *Client
- changedData string
-
- rc chan remoting.Event
-}
-
-func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
- m.eventList = append(m.eventList, eventType)
- if eventType.Content == m.changedData {
- m.rc <- eventType
- }
- return true
-}
-
-func TestListener(t *testing.T) {
- tests := []struct {
- input struct {
- k string
- v string
- }
- }{
- {input: struct {
- k string
- v string
- }{k: "/dubbo", v: changedData}},
- }
-
- c := getTestClient(t)
- defer c.Close()
-
- listener := NewEventListener(c)
- dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
- listener.ListenServiceEvent("/dubbo", dataListener)
- time.Sleep(1e9)
-
- for _, tc := range tests {
-
- k := tc.input.k
- v := tc.input.v
- if err := c.Create(k, v); err != nil {
- t.Fatal(err)
- }
-
- }
- msg := <-dataListener.rc
- assert.Equal(t, changedData, msg.Content)
-}
diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go
deleted file mode 100644
index f6a804a..0000000
--- a/remoting/kubernetes/registry_controller.go
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "context"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
-)
-
-import (
- perrors "github.com/pkg/errors"
-
- v1 "k8s.io/api/core/v1"
-
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/selection"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
- "k8s.io/apimachinery/pkg/watch"
-
- "k8s.io/client-go/informers"
- informerscorev1 "k8s.io/client-go/informers/core/v1"
-
- "k8s.io/client-go/kubernetes"
-
- "k8s.io/client-go/rest"
-
- "k8s.io/client-go/tools/cache"
-
- "k8s.io/client-go/util/workqueue"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
-)
-
-const (
- // kubernetes inject env var
- podNameKey = "HOSTNAME"
- nameSpaceKey = "NAMESPACE"
- nameSpaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
- needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
-
- // all pod annotation key
- DubboIOAnnotationKey = "dubbo.io/annotation"
- // all pod label key and value pair
- DubboIOLabelKey = "dubbo.io/label"
- DubboIOConsumerLabelValue = "dubbo.io.consumer"
- DubboIOProviderLabelValue = "dubbo.io.provider"
-
- // kubernetes suggest resync
- defaultResync = 5 * time.Minute
-)
-
-var ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
-
-// dubboRegistryController works like a kubernetes controller
-type dubboRegistryController struct {
-
- // clone from client
- // manage lifecycle
- ctx context.Context
-
- role common.RoleType
-
- // protect patch current pod operation
- lock sync.Mutex
-
- // current pod config
- needWatchedNamespace map[string]struct{}
- namespace string
- name string
-
- watcherSet WatcherSet
-
- // kubernetes
- kc kubernetes.Interface
- listAndWatchStartResourceVersion uint64
- namespacedInformerFactory map[string]informers.SharedInformerFactory
- namespacedPodInformers map[string]informerscorev1.PodInformer
- queue workqueue.Interface // shared by namespaced informers
-}
-
-func newDubboRegistryController(
- ctx context.Context,
- // different provider and consumer have behavior
- roleType common.RoleType,
- // used to inject mock kubernetes client
- kcGetter func() (kubernetes.Interface, error),
-) (*dubboRegistryController, error) {
-
- kc, err := kcGetter()
- if err != nil {
- return nil, perrors.WithMessage(err, "get kubernetes client")
- }
-
- c := &dubboRegistryController{
- ctx: ctx,
- role: roleType,
- watcherSet: newWatcherSet(ctx),
- needWatchedNamespace: make(map[string]struct{}),
- namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
- namespacedPodInformers: make(map[string]informerscorev1.PodInformer),
- kc: kc,
- }
-
- if err := c.readConfig(); err != nil {
- return nil, perrors.WithMessage(err, "read config")
- }
-
- if err := c.initCurrentPod(); err != nil {
- return nil, perrors.WithMessage(err, "init current pod")
- }
-
- if err := c.initWatchSet(); err != nil {
- return nil, perrors.WithMessage(err, "init watch set")
- }
-
- if err := c.initPodInformer(); err != nil {
- return nil, perrors.WithMessage(err, "init pod informer")
- }
-
- go c.run()
-
- return c, nil
-}
-
-// GetInClusterKubernetesClient
-// current pod running in kubernetes-cluster
-func GetInClusterKubernetesClient() (kubernetes.Interface, error) {
- // read in-cluster config
- cfg, err := rest.InClusterConfig()
- if err != nil {
- return nil, perrors.WithMessage(err, "get in-cluster config")
- }
-
- return kubernetes.NewForConfig(cfg)
-}
-
-// initWatchSet
-// 1. get all with dubbo label pods
-// 2. put every element to watcherSet
-// 3. refresh watch book-mark
-func (c *dubboRegistryController) initWatchSet() error {
- req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
- if err != nil {
- return perrors.WithMessage(err, "new requirement")
- }
-
- for ns := range c.needWatchedNamespace {
- pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
- LabelSelector: req.String(),
- })
- if err != nil {
- return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
- }
- for _, p := range pods.Items {
- // set resource version
- rv, err := strconv.ParseUint(p.GetResourceVersion(), 10, 0)
- if err != nil {
- return perrors.WithMessagef(err, "parse resource version %s", p.GetResourceVersion())
- }
- if c.listAndWatchStartResourceVersion < rv {
- c.listAndWatchStartResourceVersion = rv
- }
- c.handleWatchedPodEvent(&p, watch.Added)
- }
- }
- return nil
-}
-
-// read dubbo-registry controller config
-// 1. current pod name
-// 2. current pod working namespace
-func (c *dubboRegistryController) readConfig() error {
- // read current pod name && namespace
- c.name = os.Getenv(podNameKey)
- if len(c.name) == 0 {
- return perrors.Errorf("read pod name from env %s failed", podNameKey)
- }
- namespace, err := ioutil.ReadFile(nameSpaceFilePath)
- if err == nil && len(namespace) != 0 {
- c.namespace = string(namespace)
- return nil
- }
- c.namespace = os.Getenv(nameSpaceKey)
- if len(c.namespace) != 0 {
- return nil
- }
- return perrors.Errorf("get empty namesapce, please check if namespace file at %s exist, or environment %s"+
- " is set", nameSpaceFilePath, nameSpaceKey)
-
-}
-
-func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error {
- req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
- if err != nil {
- return perrors.WithMessage(err, "new requirement")
- }
-
- informersFactory := informers.NewSharedInformerFactoryWithOptions(
- c.kc,
- defaultResync,
- informers.WithNamespace(ns),
- informers.WithTweakListOptions(func(options *metav1.ListOptions) {
- options.LabelSelector = req.String()
- options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
- }),
- )
- podInformer := informersFactory.Core().V1().Pods()
-
- podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: c.addPod,
- UpdateFunc: c.updatePod,
- DeleteFunc: c.deletePod,
- })
-
- c.namespacedInformerFactory[ns] = informersFactory
- c.namespacedPodInformers[ns] = podInformer
-
- return nil
-}
-
-func (c *dubboRegistryController) initPodInformer() error {
- if c.role == common.PROVIDER {
- return nil
- }
-
- // read need watched namespaces list
- needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
- if len(needWatchedNameSpaceList) == 0 {
- return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
- }
- for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
- c.needWatchedNamespace[ns] = struct{}{}
- }
- // current work namespace should be watched
- c.needWatchedNamespace[c.namespace] = struct{}{}
-
- c.queue = workqueue.New()
-
- // init all watch needed pod-informer
- for watchedNS := range c.needWatchedNamespace {
- if err := c.initNamespacedPodInformer(watchedNS); err != nil {
- return err
- }
- }
- return nil
-}
-
-type kubernetesEvent struct {
- p *v1.Pod
- t watch.EventType
-}
-
-func (c *dubboRegistryController) addPod(obj interface{}) {
- p, ok := obj.(*v1.Pod)
- if !ok {
- logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
- return
- }
- c.queue.Add(&kubernetesEvent{
- t: watch.Added,
- p: p,
- })
-}
-
-func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
- op, ok := oldObj.(*v1.Pod)
- if !ok {
- logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
- return
- }
- np, ok := newObj.(*v1.Pod)
- if !ok {
- logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
- return
- }
- if op.GetResourceVersion() == np.GetResourceVersion() {
- return
- }
- c.queue.Add(&kubernetesEvent{
- p: np,
- t: watch.Modified,
- })
-}
-
-func (c *dubboRegistryController) deletePod(obj interface{}) {
- p, ok := obj.(*v1.Pod)
- if !ok {
- logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
- return
- }
- c.queue.Add(&kubernetesEvent{
- p: p,
- t: watch.Deleted,
- })
-}
-
-func (c *dubboRegistryController) startALLInformers() {
- logger.Debugf("starting namespaced informer-factory")
- for _, factory := range c.namespacedInformerFactory {
- go factory.Start(c.ctx.Done())
- }
-}
-
-// run
-// controller process every event in work-queue
-func (c *dubboRegistryController) run() {
- if c.role == common.PROVIDER {
- return
- }
-
- defer logger.Warn("dubbo registry controller work stopped")
- defer c.queue.ShutDown()
-
- for ns, podInformer := range c.namespacedPodInformers {
- if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
- logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
- return
- }
- }
-
- logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name)
-
- // start work
- go c.work()
- // block wait context cancel
- <-c.ctx.Done()
-}
-
-func (c *dubboRegistryController) work() {
- for c.processNextWorkItem() {
- }
-}
-
-// processNextWorkItem process work-queue elements
-func (c *dubboRegistryController) processNextWorkItem() bool {
- item, shutdown := c.queue.Get()
- if shutdown {
- return false
- }
- defer c.queue.Done(item)
- o := item.(*kubernetesEvent)
- c.handleWatchedPodEvent(o.p, o.t)
- return true
-}
-
-// handleWatchedPodEvent handles watched pod event
-func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
- logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
-
- for ak, av := range p.GetAnnotations() {
- // not dubbo interest annotation
- if ak != DubboIOAnnotationKey {
- continue
- }
- ol, err := c.unmarshalRecord(av)
- if err != nil {
- logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
- return
- }
- for _, o := range ol {
- switch eventType {
- case watch.Added:
- // if pod is added, the record always be create
- o.EventType = Create
- case watch.Modified:
- o.EventType = Update
- case watch.Deleted:
- o.EventType = Delete
- default:
- logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
- return
- }
-
- logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value)
- if err := c.watcherSet.Put(o); err != nil {
- logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
- return
- }
- }
- }
-}
-
-// unmarshalRecord unmarshals the kubernetes dubbo annotation value
-func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {
- if len(record) == 0 {
- // []*WatcherEvent is nil.
- return nil, nil
- }
-
- rawMsg, err := base64.URLEncoding.DecodeString(record)
- if err != nil {
- return nil, perrors.WithMessagef(err, "decode record (%s)", record)
- }
-
- var out []*WatcherEvent
- if err := json.Unmarshal(rawMsg, &out); err != nil {
- return nil, perrors.WithMessage(err, "decode json")
- }
- return out, nil
-}
-
-// initCurrentPod
-// 1. get current pod
-// 2. give the dubbo-label for this pod
-func (c *dubboRegistryController) initCurrentPod() error {
- currentPod, err := c.readCurrentPod()
- if err != nil {
- return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
- }
-
- oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
- if err != nil {
- if err == ErrDubboLabelAlreadyExist {
- return nil
- }
- return perrors.WithMessage(err, "assemble dubbo label")
- }
- // current pod don't have label
- p, err := c.getPatch(oldPod, newPod)
- if err != nil {
- return perrors.WithMessage(err, "get patch")
- }
-
- _, err = c.patchCurrentPod(p)
- if err != nil {
- return perrors.WithMessage(err, "patch to current pod")
- }
-
- return nil
-}
-
-// patchCurrentPod writes new meta for current pod
-func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
- updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
- if err != nil {
- return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
- }
- return updatedPod, nil
-}
-
-func (c *dubboRegistryController) assembleLabel(k, v string) error {
- var (
- oldPod = &v1.Pod{}
- newPod = &v1.Pod{}
- )
- oldPod.Labels = make(map[string]string, 8)
- newPod.Labels = make(map[string]string, 8)
- currentPod, err := c.readCurrentPod()
- if err != nil {
- return err
- }
- // copy current pod labels to oldPod && newPod
- for k, v := range currentPod.GetLabels() {
- oldPod.Labels[k] = v
- newPod.Labels[k] = v
- }
- newPod.Labels[k] = v
-
- p, err := c.getPatch(oldPod, newPod)
- if err != nil {
- return perrors.WithMessage(err, "get patch")
- }
-
- _, err = c.patchCurrentPod(p)
- if err != nil {
- return perrors.WithMessage(err, "patch to current pod")
- }
- return nil
-}
-
-// assembleDUBBOLabel assembles the dubbo kubernetes label
-// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
-func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
- var (
- oldPod = &v1.Pod{}
- newPod = &v1.Pod{}
- )
- oldPod.Labels = make(map[string]string, 8)
- newPod.Labels = make(map[string]string, 8)
-
- if p.GetLabels() != nil {
- if _, ok := p.GetLabels()[DubboIOLabelKey]; ok {
- // already have label
- return nil, nil, ErrDubboLabelAlreadyExist
- }
- }
-
- // copy current pod labels to oldPod && newPod
- for k, v := range p.GetLabels() {
- oldPod.Labels[k] = v
- newPod.Labels[k] = v
- }
-
- // assign new label for current pod
- switch c.role {
- case common.CONSUMER:
- newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue
- case common.PROVIDER:
- newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue
- default:
- return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role))
- }
- return oldPod, newPod, nil
-}
-
-// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations
-// accord the current pod && (k,v) assemble the old-pod, new-pod
-func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
- oldPod = &v1.Pod{}
- newPod = &v1.Pod{}
- oldPod.Annotations = make(map[string]string, 8)
- newPod.Annotations = make(map[string]string, 8)
-
- for k, v := range currentPod.GetAnnotations() {
- oldPod.Annotations[k] = v
- newPod.Annotations[k] = v
- }
-
- al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
- if err != nil {
- err = perrors.WithMessage(err, "unmarshal record")
- return
- }
-
- newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
- if err != nil {
- err = perrors.WithMessage(err, "marshal record")
- return
- }
-
- newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
- return
-}
-
-// getPatch gets the kubernetes pod patch bytes
-func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
- oldData, err := json.Marshal(oldPod)
- if err != nil {
- return nil, perrors.WithMessage(err, "marshal old pod")
- }
-
- newData, err := json.Marshal(newPod)
- if err != nil {
- return nil, perrors.WithMessage(err, "marshal newPod pod")
- }
-
- patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
- if err != nil {
- return nil, perrors.WithMessage(err, "create two-way-merge-patch")
- }
- return patchBytes, nil
-}
-
-// marshalRecord marshals the kubernetes dubbo annotation value
-func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
- msg, err := json.Marshal(ol)
- if err != nil {
- return "", perrors.WithMessage(err, "json encode object list")
- }
- return base64.URLEncoding.EncodeToString(msg), nil
-}
-
-// readCurrentPod reads from kubernetes-env current pod status
-func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
- currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
- if err != nil {
- return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
- }
- return currentPod, nil
-}
-
-// addAnnotationForCurrentPod adds annotation for current pod
-func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- // 1. accord old pod && (k, v) assemble new pod dubbo annotation v
- // 2. get patch data
- // 3. PATCH the pod
- currentPod, err := c.readCurrentPod()
- if err != nil {
- return perrors.WithMessage(err, "read current pod")
- }
-
- oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
- if err != nil {
- return perrors.WithMessage(err, "assemble")
- }
-
- patchBytes, err := c.getPatch(oldPod, newPod)
- if err != nil {
- return perrors.WithMessage(err, "get patch")
- }
-
- _, err = c.patchCurrentPod(patchBytes)
- if err != nil {
- return perrors.WithMessage(err, "patch current pod")
- }
-
- return c.watcherSet.Put(&WatcherEvent{
- Key: k,
- Value: v,
- EventType: Create,
- })
-}
diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go
deleted file mode 100644
index 34f5a34..0000000
--- a/remoting/kubernetes/watch.go
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "context"
- "strconv"
- "strings"
- "sync"
-)
-
-import (
- perrors "github.com/pkg/errors"
-)
-
-var (
- ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
- ErrKVPairNotFound = perrors.New("k/v pair not found")
-)
-
-const (
- defaultWatcherChanSize = 100
-)
-
-type eventType int
-
-const (
- Create eventType = iota
- Update
- Delete
-)
-
-func (e eventType) String() string {
- switch e {
- case Create:
- return "CREATE"
- case Update:
- return "UPDATE"
- case Delete:
- return "DELETE"
- default:
- return "UNKNOWN"
- }
-}
-
-// WatcherEvent
-// watch event is element in watcherSet
-type WatcherEvent struct {
- // event-type
- EventType eventType `json:"-"`
- // the dubbo-go should consume the key
- Key string `json:"k"`
- // the dubbo-go should consume the value
- Value string `json:"v"`
-}
-
-// Watchable WatcherSet
-// thread-safe
-type WatcherSet interface {
-
- // put the watch event to the watch set
- Put(object *WatcherEvent) error
- // if prefix is false,
- // the len([]*WatcherEvent) == 1
- Get(key string, prefix bool) ([]*WatcherEvent, error)
- // watch the spec key or key prefix
- Watch(key string, prefix bool) (Watcher, error)
- // check the watcher set status
- Done() <-chan struct{}
-}
-
-// Watcher
-type Watcher interface {
- // the watcher's id
- ID() string
- // result stream
- ResultChan() <-chan *WatcherEvent
- // Stop the watcher
- stop()
- // check the watcher status
- done() <-chan struct{}
-}
-
-// the watch set implement
-type watcherSetImpl struct {
-
- // Client's ctx, client die, the watch set will die too
- ctx context.Context
-
- // protect watcher-set and watchers
- lock sync.RWMutex
-
- // the key is dubbo-go interest meta
- cache map[string]*WatcherEvent
-
- currentWatcherId uint64
- watchers map[uint64]*watcher
-}
-
-// closeWatchers
-// when the watcher-set was closed
-func (s *watcherSetImpl) closeWatchers() {
- <-s.ctx.Done()
- // parent ctx be canceled, close the watch-set's watchers
- s.lock.Lock()
- watchers := s.watchers
- s.lock.Unlock()
-
- for _, w := range watchers {
- // stop data stream
- // close(w.ch)
- // stop watcher
- w.stop()
- }
-}
-
-// Watch
-// watch on spec key, with or without prefix
-func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
- return s.addWatcher(key, prefix)
-}
-
-// Done gets the watcher-set status
-func (s *watcherSetImpl) Done() <-chan struct{} {
- return s.ctx.Done()
-}
-
-// Put puts the watch event to watcher-set
-func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
- blockSendMsg := func(object *WatcherEvent, w *watcher) {
- select {
- case <-w.done():
- // the watcher already stop
- case w.ch <- object:
- // block send the msg
- }
- }
-
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if err := s.valid(); err != nil {
- return err
- }
-
- // put to watcher-set
- switch watcherEvent.EventType {
- case Delete:
- // delete from store
- delete(s.cache, watcherEvent.Key)
- case Update, Create:
- o, ok := s.cache[watcherEvent.Key]
- if !ok {
- // pod update, but create new k/v pair
- watcherEvent.EventType = Create
- s.cache[watcherEvent.Key] = watcherEvent
- break
- }
- // k/v pair already latest
- if o.Value == watcherEvent.Value {
- return nil
- }
- // update to latest status
- s.cache[watcherEvent.Key] = watcherEvent
- }
-
- // notify watcher
- for _, w := range s.watchers {
- if !strings.Contains(watcherEvent.Key, w.interested.key) {
- // this watcher no interest in this element
- continue
- }
- if !w.interested.prefix {
- if watcherEvent.Key == w.interested.key {
- blockSendMsg(watcherEvent, w)
- }
- // not interest
- continue
- }
- blockSendMsg(watcherEvent, w)
- }
- return nil
-}
-
-// valid
-func (s *watcherSetImpl) valid() error {
- select {
- case <-s.ctx.Done():
- return ErrWatcherSetAlreadyStopped
- default:
- return nil
- }
-}
-
-// addWatcher
-func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
- if err := s.valid(); err != nil {
- return nil, err
- }
-
- s.lock.Lock()
- defer s.lock.Unlock()
-
- // increase the watcher-id
- s.currentWatcherId++
-
- w := &watcher{
- id: s.currentWatcherId,
- watcherSet: s,
- interested: struct {
- key string
- prefix bool
- }{key: key, prefix: prefix},
- ch: make(chan *WatcherEvent, defaultWatcherChanSize),
- exit: make(chan struct{}),
- }
- s.watchers[s.currentWatcherId] = w
- return w, nil
-}
-
-// Get gets elements from watcher-set
-func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
- s.lock.RLock()
- defer s.lock.RUnlock()
-
- if err := s.valid(); err != nil {
- return nil, err
- }
-
- if !prefix {
- for k, v := range s.cache {
- if k == key {
- return []*WatcherEvent{v}, nil
- }
- }
- // object
- return nil, ErrKVPairNotFound
- }
-
- var out []*WatcherEvent
-
- for k, v := range s.cache {
- if strings.Contains(k, key) {
- out = append(out, v)
- }
- }
-
- if len(out) == 0 {
- return nil, ErrKVPairNotFound
- }
-
- return out, nil
-}
-
-// the watcher-set watcher
-type watcher struct {
- id uint64
-
- // the underlay watcherSet
- watcherSet *watcherSetImpl
-
- // the interest topic
- interested struct {
- key string
- prefix bool
- }
- ch chan *WatcherEvent
-
- closeOnce sync.Once
- exit chan struct{}
-}
-
-// nolint
-func (w *watcher) ResultChan() <-chan *WatcherEvent {
- return w.ch
-}
-
-// nolint
-func (w *watcher) ID() string {
- return strconv.FormatUint(w.id, 10)
-}
-
-// nolint
-func (w *watcher) stop() {
- // double close will panic
- w.closeOnce.Do(func() {
- close(w.exit)
- })
-}
-
-// done checks watcher status
-func (w *watcher) done() <-chan struct{} {
- return w.exit
-}
-
-// newWatcherSet returns new watcher set from parent context
-func newWatcherSet(ctx context.Context) WatcherSet {
- s := &watcherSetImpl{
- ctx: ctx,
- cache: map[string]*WatcherEvent{},
- watchers: map[uint64]*watcher{},
- }
- go s.closeWatchers()
- return s
-}
diff --git a/remoting/kubernetes/watch_test.go b/remoting/kubernetes/watch_test.go
deleted file mode 100644
index 9a0139d..0000000
--- a/remoting/kubernetes/watch_test.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
- "context"
- "strconv"
- "sync"
- "testing"
- "time"
-)
-
-func TestWatchSet(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
- defer cancel()
-
- s := newWatcherSet(ctx)
-
- wg := sync.WaitGroup{}
-
- for i := 0; i < 2; i++ {
-
- wg.Add(1)
-
- go func() {
- defer wg.Done()
- w, err := s.Watch("key-1", false)
- if err != nil {
- t.Error(err)
- return
- }
- for {
- select {
- case e := <-w.ResultChan():
- t.Logf("consumer %s got %s\n", w.ID(), e.Key)
-
- case <-w.done():
- t.Logf("consumer %s stopped", w.ID())
- return
- }
- }
- }()
- }
- for i := 2; i < 3; i++ {
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- w, err := s.Watch("key", true)
- if err != nil {
- t.Error(err)
- return
- }
-
- for {
- select {
- case e := <-w.ResultChan():
- t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
-
- case <-w.done():
- t.Logf("prefix consumer %s stopped", w.ID())
- return
- }
- }
- }()
- }
-
- for i := 0; i < 5; i++ {
- go func(i int) {
- if err := s.Put(&WatcherEvent{
- Key: "key-" + strconv.Itoa(i),
- Value: strconv.Itoa(i),
- }); err != nil {
- t.Error(err)
- return
- }
- }(i)
- }
-
- wg.Wait()
-}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index b4a2503..fbe4749 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -175,7 +175,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
for _, n := range newChildren {
newNode = path.Join(zkPath, n)
- logger.Infof("[Zookeeper Listener] add zkNode{%s}", newNode)
+ logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
content, _, connErr := l.client.Conn.Get(newNode)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",