You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/11/12 08:44:52 UTC

[dubbo-go] branch 3.0 updated: Fix: some bugs and features for 3.0 (#1586)

This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 1fdd429  Fix: some bugs and features for 3.0 (#1586)
1fdd429 is described below

commit 1fdd429b8c4a02c477ceca510cce779f782fe512
Author: Laurence <45...@users.noreply.github.com>
AuthorDate: Fri Nov 12 16:44:44 2021 +0800

    Fix: some bugs and features for 3.0 (#1586)
    
    * fix: some bugs
    
    * Fix: remove file\k8s registry, k8s remote
---
 common/proxy/proxy.go                      |  31 +-
 config_center/nacos/listener.go            |   2 +-
 go.mod                                     |   7 +-
 go.sum                                     |  18 +-
 protocol/dubbo3/dubbo3_invoker.go          |  19 +-
 protocol/dubbo3/dubbo3_protocol.go         |   8 +-
 registry/base_registry.go                  |   6 +-
 registry/file/listener.go                  |  29 --
 registry/file/service_discovery.go         | 267 ------------
 registry/file/service_discovery_test.go    |  91 -----
 registry/kubernetes/listener.go            | 127 ------
 registry/kubernetes/listener_test.go       |  53 ---
 registry/kubernetes/registry.go            | 241 -----------
 registry/kubernetes/registry_test.go       | 344 ----------------
 remoting/kubernetes/client.go              | 213 ----------
 remoting/kubernetes/client_test.go         | 455 ---------------------
 remoting/kubernetes/facade.go              |  28 --
 remoting/kubernetes/facade_test.go         |  78 ----
 remoting/kubernetes/listener.go            | 212 ----------
 remoting/kubernetes/listener_test.go       | 103 -----
 remoting/kubernetes/registry_controller.go | 633 -----------------------------
 remoting/kubernetes/watch.go               | 320 ---------------
 remoting/kubernetes/watch_test.go          |  96 -----
 remoting/zookeeper/listener.go             |   2 +-
 24 files changed, 53 insertions(+), 3330 deletions(-)

diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go
index e1c6222..c32eedc 100644
--- a/common/proxy/proxy.go
+++ b/common/proxy/proxy.go
@@ -19,6 +19,7 @@ package proxy
 
 import (
 	"context"
+	"errors"
 	"reflect"
 	"sync"
 )
@@ -116,13 +117,6 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 	valueOf := reflect.ValueOf(v)
 
 	valueOfElem := valueOf.Elem()
-	typeOf := valueOfElem.Type()
-
-	// check incoming interface, incoming interface's elem must be a struct.
-	if typeOf.Kind() != reflect.Struct {
-		logger.Errorf("The type of RPCService(=\"%T\") must be a pointer of a struct.", v)
-		return
-	}
 
 	makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
 		return func(in []reflect.Value) []reflect.Value {
@@ -227,6 +221,18 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 		}
 	}
 
+	if err := refectAndMakeObjectFunc(valueOfElem, makeDubboCallProxy); err != nil {
+		logger.Errorf("The type or combination type of RPCService %T must be a pointer of a struct. error is %s", v, err)
+		return
+	}
+}
+
+func refectAndMakeObjectFunc(valueOfElem reflect.Value, makeDubboCallProxy func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value) error {
+	typeOf := valueOfElem.Type()
+	// check incoming interface, incoming interface's elem must be a struct.
+	if typeOf.Kind() != reflect.Struct {
+		return errors.New("invalid type kind")
+	}
 	numField := valueOfElem.NumField()
 	for i := 0; i < numField; i++ {
 		t := typeOf.Field(i)
@@ -258,6 +264,17 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
 			// do method proxy here:
 			f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
 			logger.Debugf("set method [%s]", methodName)
+		} else if f.IsValid() && f.CanSet() {
+			// for struct combination
+			valueOfSub := reflect.New(t.Type)
+			valueOfElemInterface := valueOfSub.Elem()
+			if valueOfElemInterface.Type().Kind() == reflect.Struct {
+				if err := refectAndMakeObjectFunc(valueOfElemInterface, makeDubboCallProxy); err != nil {
+					return err
+				}
+				f.Set(valueOfElemInterface)
+			}
 		}
 	}
+	return nil
 }
diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go
index 3d60d2a..a4cf589 100644
--- a/config_center/nacos/listener.go
+++ b/config_center/nacos/listener.go
@@ -22,6 +22,7 @@ import (
 )
 
 import (
+	constant2 "github.com/nacos-group/nacos-sdk-go/common/constant"
 	"github.com/nacos-group/nacos-sdk-go/vo"
 )
 
@@ -30,7 +31,6 @@ import (
 	"dubbo.apache.org/dubbo-go/v3/common/logger"
 	"dubbo.apache.org/dubbo-go/v3/config_center"
 	"dubbo.apache.org/dubbo-go/v3/remoting"
-	constant2 "github.com/nacos-group/nacos-sdk-go/common/constant"
 )
 
 func callback(listener config_center.ConfigurationListener, _, _, dataId, data string) {
diff --git a/go.mod b/go.mod
index 30d3bb8..3256db0 100644
--- a/go.mod
+++ b/go.mod
@@ -13,8 +13,8 @@ require (
 	github.com/creasty/defaults v1.5.2
 	github.com/dubbogo/go-zookeeper v1.0.3
 	github.com/dubbogo/gost v1.11.19
-	github.com/dubbogo/grpc-go v1.42.4-triple
-	github.com/dubbogo/triple v1.1.2
+	github.com/dubbogo/grpc-go v1.42.5-triple
+	github.com/dubbogo/triple v1.1.3
 	github.com/emicklei/go-restful/v3 v3.7.1
 	github.com/fsnotify/fsnotify v1.5.1
 	github.com/ghodss/yaml v1.0.0
@@ -32,8 +32,6 @@ require (
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
 	github.com/nacos-group/nacos-sdk-go v1.0.9
 	github.com/natefinch/lumberjack v2.0.0+incompatible
-	github.com/onsi/ginkgo v1.10.1 // indirect
-	github.com/onsi/gomega v1.7.0 // indirect
 	github.com/opentracing/opentracing-go v1.2.0
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.11.0
@@ -49,7 +47,6 @@ require (
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0
-	k8s.io/api v0.16.9
 	k8s.io/apimachinery v0.16.9
 	k8s.io/client-go v0.16.9
 )
diff --git a/go.sum b/go.sum
index 2fbdc68..4ab084c 100644
--- a/go.sum
+++ b/go.sum
@@ -177,13 +177,13 @@ github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZT
 github.com/dubbogo/gost v1.11.18/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
 github.com/dubbogo/gost v1.11.19 h1:R1rZ3TNJKV9W5XHLMv+GDO2Wy6UDnwGQtVWbsWYvo0A=
 github.com/dubbogo/gost v1.11.19/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
-github.com/dubbogo/grpc-go v1.42.4-triple h1:ysiabUrEGcaeXgnjSBT0bB1M7EexSJFiO0Mebg/Iqa4=
-github.com/dubbogo/grpc-go v1.42.4-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
+github.com/dubbogo/grpc-go v1.42.5-triple h1:Ed5z/ikkpdZHBMA4mTEthQFTQeKlHtkdAsQrZjTbFk8=
+github.com/dubbogo/grpc-go v1.42.5-triple/go.mod h1:F1T9hnUvYGW4JLK1QNriavpOkhusU677ovPzLkk6zHM=
 github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
 github.com/dubbogo/net v0.0.4/go.mod h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc=
 github.com/dubbogo/triple v1.0.9/go.mod h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw=
-github.com/dubbogo/triple v1.1.2 h1:7lmQ0uNvcIYlMj5gNwPQadFx8w8UDEtcYl4DL6X+idM=
-github.com/dubbogo/triple v1.1.2/go.mod h1:x+H41M5yP1ULnJu4b+o8VrgsIKdTPslTum2yUqA9N1I=
+github.com/dubbogo/triple v1.1.3 h1:XKSh42lE2HLud++g4Fif7XY2hSMEsohFpegZPvsNXVQ=
+github.com/dubbogo/triple v1.1.3/go.mod h1:suMeAfZliq0p/lWIytgEdiuKcRlmeJC9pYeNHVE7FWU=
 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -208,7 +208,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m
 github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
 github.com/envoyproxy/go-control-plane v0.10.0/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
-github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
 github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
 github.com/evanphx/json-patch/v5 v5.5.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 h1:Ghm4eQYC0nEPnSJdVkTrXpu9KtoVCSo1hg7mtI7G9KU=
@@ -358,7 +357,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
-github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
 github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
 github.com/gophercloud/gophercloud v0.1.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEoIEcSTewFxm1c5g8=
 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
@@ -440,7 +438,6 @@ github.com/hashicorp/vault/sdk v0.3.0 h1:kR3dpxNkhh/wr6ycaJYqp6AFT/i2xaftbfnwZdu
 github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
 github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
 github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
-github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@@ -585,13 +582,9 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
-github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
-github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
-github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
 github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
 github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
@@ -1200,7 +1193,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
-gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
 gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
@@ -1212,7 +1204,6 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
 gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
-gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
 gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
@@ -1248,7 +1239,6 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
 k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
 k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
 k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
-k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf h1:EYm5AW/UUDbnmnI+gK0TJDVK9qPLhM+sRHYanNKw0EQ=
 k8s.io/kube-openapi v0.0.0-20190816220812-743ec37842bf/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
 k8s.io/utils v0.0.0-20190801114015-581e00157fb1 h1:+ySTxfHnfzZb9ys375PXNlLhkJPLKgHajBU0N62BDvE=
 k8s.io/utils v0.0.0-20190801114015-581e00157fb1/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index c978f85..da42a32 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -27,6 +27,8 @@ import (
 )
 
 import (
+	"github.com/dubbogo/grpc-go/metadata"
+
 	tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
 	triConfig "github.com/dubbogo/triple/pkg/config"
 	"github.com/dubbogo/triple/pkg/triple"
@@ -134,7 +136,19 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	}
 
 	// append interface id to ctx
-	ctx = context.WithValue(ctx, tripleConstant.CtxAttachmentKey, invocation.Attachments())
+	gRPCMD := make(metadata.MD, 0)
+	for k, v := range invocation.Attachments() {
+		if str, ok := v.(string); ok {
+			gRPCMD.Set(k, str)
+			continue
+		}
+		if str, ok := v.([]string); ok {
+			gRPCMD.Set(k, str...)
+			continue
+		}
+		logger.Warnf("triple attachment value with key = %s is invalid, which should be string or []string", k)
+	}
+	ctx = metadata.NewOutgoingContext(ctx, gRPCMD)
 	ctx = context.WithValue(ctx, tripleConstant.InterfaceKey, di.BaseInvoker.GetURL().GetParam(constant.InterfaceKey, ""))
 	in := make([]reflect.Value, 0, 16)
 	in = append(in, reflect.ValueOf(ctx))
@@ -146,8 +160,9 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
 	methodName := invocation.MethodName()
 	triAttachmentWithErr := di.client.Invoke(methodName, in, invocation.Reply())
 	result.Err = triAttachmentWithErr.GetError()
+	result.Attrs = make(map[string]interface{})
 	for k, v := range triAttachmentWithErr.GetAttachments() {
-		result.Attachment(k, v)
+		result.Attrs[k] = v
 	}
 	result.Rest = invocation.Reply()
 	return &result
diff --git a/protocol/dubbo3/dubbo3_protocol.go b/protocol/dubbo3/dubbo3_protocol.go
index 3e097c7..2ad3dff 100644
--- a/protocol/dubbo3/dubbo3_protocol.go
+++ b/protocol/dubbo3/dubbo3_protocol.go
@@ -190,13 +190,7 @@ func (d *UnaryService) GetReqParamsInterfaces(methodName string) ([]interface{},
 }
 
 func (d *UnaryService) InvokeWithArgs(ctx context.Context, methodName string, arguments []interface{}) (interface{}, error) {
-	dubboAttachment := make(map[string]interface{})
-	tripleAttachment, ok := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.TripleAttachment)
-	if ok {
-		for k, v := range tripleAttachment {
-			dubboAttachment[k] = v
-		}
-	}
+	dubboAttachment, _ := ctx.Value(tripleConstant.TripleAttachement).(tripleCommon.DubboAttachment)
 	res := d.proxyImpl.Invoke(ctx, invocation.NewRPCInvocation(methodName, arguments, dubboAttachment))
 	return res, res.Error()
 }
diff --git a/registry/base_registry.go b/registry/base_registry.go
index 7ac9f2b..c6151e8 100644
--- a/registry/base_registry.go
+++ b/registry/base_registry.go
@@ -358,7 +358,7 @@ func (r *BaseRegistry) consumerRegistry(c *common.URL, params url.Values, f crea
 		rawURL    string
 		err       error
 	)
-	dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER])
+	dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.CONSUMER])
 
 	if f != nil {
 		err = f(dubboPath)
@@ -412,7 +412,7 @@ func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
 				listener.Close()
 				break
 			} else {
-				logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
+				logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
 				notifyListener.Notify(serviceEvent)
 			}
 		}
@@ -443,7 +443,7 @@ func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListene
 			listener.Close()
 			break
 		} else {
-			logger.Infof("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
+			logger.Debugf("[Zookeeper Registry] update begin, service event: %v", serviceEvent.String())
 			notifyListener.Notify(serviceEvent)
 		}
 	}
diff --git a/registry/file/listener.go b/registry/file/listener.go
deleted file mode 100644
index 55f35ea..0000000
--- a/registry/file/listener.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/config_center"
-)
-
-// RegistryConfigurationListener represent the processor of flie watcher
-type RegistryConfigurationListener struct{}
-
-// Process submit the ConfigChangeEvent to the event chan to notify all observer
-func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
-}
diff --git a/registry/file/service_discovery.go b/registry/file/service_discovery.go
deleted file mode 100644
index 65f2c24..0000000
--- a/registry/file/service_discovery.go
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-import (
-	"encoding/json"
-	"fmt"
-	"io/ioutil"
-	"os"
-	"path"
-	"strconv"
-)
-
-import (
-	gxset "github.com/dubbogo/gost/container/set"
-	gxpage "github.com/dubbogo/gost/hash/page"
-
-	perrors "github.com/pkg/errors"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/constant"
-	"dubbo.apache.org/dubbo-go/v3/common/extension"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-	"dubbo.apache.org/dubbo-go/v3/config_center"
-	"dubbo.apache.org/dubbo-go/v3/config_center/file"
-	"dubbo.apache.org/dubbo-go/v3/registry"
-)
-
-// init will put the service discovery into extension
-func init() {
-	extension.SetServiceDiscovery(constant.FileKey, newFileSystemServiceDiscovery)
-}
-
-// fileServiceDiscovery is the implementation of service discovery based on file.
-type fileSystemServiceDiscovery struct {
-	dynamicConfiguration file.FileSystemDynamicConfiguration
-	rootPath             string
-	fileMap              map[string]string
-}
-
-func newFileSystemServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
-	if url.Protocol != constant.FileKey {
-		return nil, perrors.New("could not init the instance because the config is invalid")
-	}
-
-	rp, err := file.Home()
-	if err != nil {
-		return nil, perrors.WithStack(err)
-	}
-	fdcf := extension.GetConfigCenterFactory(constant.FileKey)
-	p := path.Join(rp, ".dubbo", constant.RegistryKey)
-	url.AddParamAvoidNil(file.ConfigCenterDirParamName, p)
-	c, err := fdcf.GetDynamicConfiguration(url)
-	if err != nil {
-		return nil, perrors.WithStack(err)
-	}
-
-	sd := &fileSystemServiceDiscovery{
-		dynamicConfiguration: *c.(*file.FileSystemDynamicConfiguration),
-		rootPath:             p,
-		fileMap:              make(map[string]string),
-	}
-
-	extension.AddCustomShutdownCallback(func() {
-		if err := sd.Destroy(); err != nil {
-			logger.Warnf("sd.Destroy() = error:%v", err)
-		}
-	})
-
-	for _, v := range sd.GetServices().Values() {
-		for _, i := range sd.GetInstances(v.(string)) {
-			// like java do nothing
-			l := &RegistryConfigurationListener{}
-			sd.dynamicConfiguration.AddListener(getServiceInstanceId(i), l, config_center.WithGroup(getServiceName(i)))
-		}
-	}
-
-	return sd, nil
-}
-
-// nolint
-func (fssd *fileSystemServiceDiscovery) String() string {
-	return fmt.Sprintf("file-system-service-discovery")
-}
-
-// Destroy will destroy the service discovery.
-// If the discovery cannot be destroy, it will return an error.
-func (fssd *fileSystemServiceDiscovery) Destroy() error {
-	fssd.dynamicConfiguration.Close()
-
-	for _, f := range fssd.fileMap {
-		fssd.releaseAndRemoveRegistrationFiles(f)
-	}
-
-	return nil
-}
-
-// nolint
-func (fssd *fileSystemServiceDiscovery) releaseAndRemoveRegistrationFiles(file string) {
-	os.RemoveAll(file)
-}
-
-// ----------------- registration ----------------
-
-// Register will register an instance of ServiceInstance to registry
-func (fssd *fileSystemServiceDiscovery) Register(instance registry.ServiceInstance) error {
-	id := getServiceInstanceId(instance)
-	sn := getServiceName(instance)
-
-	c, err := toJsonString(instance)
-	if err != nil {
-		return perrors.WithStack(err)
-	}
-
-	err = fssd.dynamicConfiguration.PublishConfig(id, sn, c)
-	if err != nil {
-		return perrors.WithStack(err)
-	}
-
-	fssd.fileMap[id] = fssd.dynamicConfiguration.GetPath(id, sn)
-
-	return nil
-}
-
-// nolint
-func getServiceInstanceId(si registry.ServiceInstance) string {
-	if si.GetID() == "" {
-		return si.GetHost() + "." + strconv.Itoa(si.GetPort())
-	}
-
-	return si.GetID()
-}
-
-// nolint
-func getServiceName(si registry.ServiceInstance) string {
-	return si.GetServiceName()
-}
-
-// toJsonString to json string
-func toJsonString(si registry.ServiceInstance) (string, error) {
-	bytes, err := json.Marshal(si)
-	if err != nil {
-		return "", perrors.WithStack(err)
-	}
-
-	return string(bytes), nil
-}
-
-// Update will update the data of the instance in registry
-func (fssd *fileSystemServiceDiscovery) Update(instance registry.ServiceInstance) error {
-	return fssd.Register(instance)
-}
-
-// Unregister will unregister this instance from registry
-func (fssd *fileSystemServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
-	id := getServiceInstanceId(instance)
-	sn := getServiceName(instance)
-
-	err := fssd.dynamicConfiguration.RemoveConfig(id, sn)
-	if err != nil {
-		return perrors.WithStack(err)
-	}
-
-	delete(fssd.fileMap, instance.GetID())
-	return nil
-}
-
-// ----------------- discovery -------------------
-// GetDefaultPageSize will return the default page size
-func (fssd *fileSystemServiceDiscovery) GetDefaultPageSize() int {
-	return 100
-}
-
-// GetServices will return the all service names.
-func (fssd *fileSystemServiceDiscovery) GetServices() *gxset.HashSet {
-	r := gxset.NewSet()
-	// dynamicConfiguration root path is the actual root path
-	fileInfo, _ := ioutil.ReadDir(fssd.dynamicConfiguration.RootPath())
-
-	for _, file := range fileInfo {
-		if file.IsDir() {
-			r.Add(file.Name())
-		}
-	}
-
-	return r
-}
-
-// GetInstances will return all service instances with serviceName
-func (fssd *fileSystemServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
-	set, err := fssd.dynamicConfiguration.GetConfigKeysByGroup(serviceName)
-	if err != nil {
-		logger.Errorf("[FileServiceDiscovery] Could not query the instances for service{%s}, error = err{%v} ",
-			serviceName, err)
-		return make([]registry.ServiceInstance, 0)
-	}
-
-	res := make([]registry.ServiceInstance, 0, set.Size())
-	for _, v := range set.Values() {
-		id := v.(string)
-		p, err := fssd.dynamicConfiguration.GetProperties(id, config_center.WithGroup(serviceName))
-		if err != nil {
-			logger.Errorf("[FileServiceDiscovery] Could not get the properties for id{%s}, service{%s}, "+
-				"error = err{%v} ",
-				id, serviceName, err)
-			return make([]registry.ServiceInstance, 0)
-		}
-
-		dsi := &registry.DefaultServiceInstance{}
-		err = json.Unmarshal([]byte(p), dsi)
-		if err != nil {
-			logger.Errorf("[FileServiceDiscovery] Could not unmarshal the properties for id{%s}, service{%s}, "+
-				"error = err{%v} ",
-				id, serviceName, err)
-			return make([]registry.ServiceInstance, 0)
-		}
-
-		res = append(res, dsi)
-	}
-
-	return res
-}
-
-// GetInstancesByPage will return a page containing instances of ServiceInstance with the serviceName
-// the page will start at offset
-func (fssd *fileSystemServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
-	return nil
-}
-
-// GetHealthyInstancesByPage will return a page containing instances of ServiceInstance.
-// The param healthy indices that the instance should be healthy or not.
-// The page will start at offset
-func (fssd *fileSystemServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int,
-	healthy bool) gxpage.Pager {
-	return nil
-}
-
-// Batch get all instances by the specified service names
-func (fssd *fileSystemServiceDiscovery) GetRequestInstances(serviceNames []string, offset int,
-	requestedSize int) map[string]gxpage.Pager {
-	return nil
-}
-
-// ----------------- event ----------------------
-// AddListener adds a new ServiceInstancesChangedListenerImpl
-// client
-func (fssd *fileSystemServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
-	// fssd.dynamicConfiguration.AddListener(listener.ServiceName)
-	return nil
-}
diff --git a/registry/file/service_discovery_test.go b/registry/file/service_discovery_test.go
deleted file mode 100644
index 0152fc6..0000000
--- a/registry/file/service_discovery_test.go
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package file
-
-//
-//import (
-//	"math/rand"
-//	"strconv"
-//	"testing"
-//	"time"
-//)
-//
-//import (
-//	"github.com/stretchr/testify/assert"
-//)
-//
-//import (
-//	"dubbo.apache.org/dubbo-go/v3/common/constant"
-//	"dubbo.apache.org/dubbo-go/v3/common/extension"
-//	"dubbo.apache.org/dubbo-go/v3/registry"
-//)
-//
-//func TestNewFileSystemServiceDiscoveryAndDestroy(t *testing.T) {
-//	prepareData()
-//	serviceDiscovery, err := newFileSystemServiceDiscovery()
-//	assert.NoError(t, err)
-//	assert.NotNil(t, serviceDiscovery)
-//	defer func() {
-//		err = serviceDiscovery.Destroy()
-//		assert.Nil(t, err)
-//	}()
-//}
-//
-//func TestCURDFileSystemServiceDiscovery(t *testing.T) {
-//	prepareData()
-//	serviceDiscovery, err := extension.GetServiceDiscovery(constant.FILE_KEY)
-//	assert.NoError(t, err)
-//	md := make(map[string]string)
-//
-//	rand.Seed(time.Now().Unix())
-//	serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
-//	md["t1"] = "test1"
-//	r1 := &registry.DefaultServiceInstance{
-//		ID:          "123456789",
-//		ServiceName: serviceName,
-//		Host:        "127.0.0.1",
-//		Port:        2233,
-//		Enable:      true,
-//		Healthy:     true,
-//		Metadata:    md,
-//	}
-//	err = serviceDiscovery.Register(r1)
-//	assert.NoError(t, err)
-//
-//	instances := serviceDiscovery.GetInstances(r1.ServiceName)
-//	assert.Equal(t, 1, len(instances))
-//	assert.Equal(t, r1.ID, instances[0].GetID())
-//	assert.Equal(t, r1.ServiceName, instances[0].GetServiceName())
-//	assert.Equal(t, r1.Port, instances[0].GetPort())
-//
-//	err = serviceDiscovery.Unregister(r1)
-//	assert.NoError(t, err)
-//
-//	err = serviceDiscovery.Register(r1)
-//	assert.NoError(t, err)
-//	defer func() {
-//		err = serviceDiscovery.Destroy()
-//		assert.NoError(t, err)
-//	}()
-//}
-//
-//func prepareData() {
-//	//config.GetRootConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
-//	//	Protocol: "file",
-//	//}
-//}
diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go
deleted file mode 100644
index 7605235..0000000
--- a/registry/kubernetes/listener.go
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"strings"
-)
-
-import (
-	gxchan "github.com/dubbogo/gost/container/chan"
-
-	perrors "github.com/pkg/errors"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-	"dubbo.apache.org/dubbo-go/v3/config_center"
-	"dubbo.apache.org/dubbo-go/v3/registry"
-	"dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-type dataListener struct {
-	interestedURL []*common.URL
-	listener      config_center.ConfigurationListener
-}
-
-// NewRegistryDataListener creates a data listener for kubernetes
-func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
-	return &dataListener{listener: listener}
-}
-
-// AddInterestedURL adds the @url of registry center to the listener
-func (l *dataListener) AddInterestedURL(url *common.URL) {
-	l.interestedURL = append(l.interestedURL, url)
-}
-
-// DataChange
-// notify listen, when interest event
-func (l *dataListener) DataChange(eventType remoting.Event) bool {
-	index := strings.Index(eventType.Path, "/providers/")
-	if index == -1 {
-		logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
-		return false
-	}
-	url := eventType.Path[index+len("/providers/"):]
-	serviceURL, err := common.NewURL(url)
-	if err != nil {
-		logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
-		return false
-	}
-
-	for _, v := range l.interestedURL {
-		if serviceURL.URLEqual(v) {
-			l.listener.Process(
-				&config_center.ConfigChangeEvent{
-					Key:        eventType.Path,
-					Value:      serviceURL,
-					ConfigType: eventType.Action,
-				},
-			)
-			return true
-		}
-	}
-	return false
-}
-
-type configurationListener struct {
-	registry *kubernetesRegistry
-	events   *gxchan.UnboundedChan
-}
-
-// NewConfigurationListener for listening the event of kubernetes.
-func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
-	// add a new waiter
-	reg.WaitGroup().Add(1)
-	return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
-}
-
-// Process processes the data change event from config center of kubernetes
-func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
-	l.events.In() <- configType
-}
-
-// Next returns next service event once received
-func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
-	for {
-		select {
-		case <-l.registry.Done():
-			logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
-			return nil, perrors.New("listener stopped")
-
-		case val := <-l.events.Out():
-			e, _ := val.(*config_center.ConfigChangeEvent)
-			logger.Debugf("got kubernetes event %#v", e)
-			if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
-				select {
-				case <-l.registry.Done():
-					logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
-				default:
-				}
-				continue
-			}
-			return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
-		}
-	}
-}
-
-// Close kubernetes registry center
-func (l *configurationListener) Close() {
-	l.registry.WaitGroup().Done()
-}
diff --git a/registry/kubernetes/listener_test.go b/registry/kubernetes/listener_test.go
deleted file mode 100644
index a4913f5..0000000
--- a/registry/kubernetes/listener_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"testing"
-)
-
-import (
-	"github.com/stretchr/testify/assert"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/config_center"
-	"dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-func Test_DataChange(t *testing.T) {
-	listener := NewRegistryDataListener(&MockDataListener{})
-	url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bs [...]
-	listener.AddInterestedURL(url)
-	int := listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.ret [...]
-	assert.Equal(t, true, int)
-}
-
-type MockDataListener struct{}
-
-func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {}
-
-func TestDataChange(t *testing.T) {
-	listener := NewRegistryDataListener(&MockDataListener{})
-	url, _ := common.NewURL("jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bs [...]
-	listener.AddInterestedURL(url)
-	if !listener.DataChange(remoting.Event{Path: "/dubbo/com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retrie [...]
-		t.Fatal("data change not ok")
-	}
-}
diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go
deleted file mode 100644
index 63af25f..0000000
--- a/registry/kubernetes/registry.go
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"fmt"
-	"path"
-	"sync"
-	"time"
-)
-
-import (
-	gxtime "github.com/dubbogo/gost/time"
-
-	perrors "github.com/pkg/errors"
-
-	v1 "k8s.io/api/core/v1"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/constant"
-	"dubbo.apache.org/dubbo-go/v3/common/extension"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-	"dubbo.apache.org/dubbo-go/v3/registry"
-	"dubbo.apache.org/dubbo-go/v3/remoting/kubernetes"
-)
-
-const (
-	Name         = "kubernetes"
-	ConnDelay    = 3
-	MaxFailTimes = 15
-)
-
-func init() {
-	// processID = fmt.Sprintf("%d", os.Getpid())
-	// localIP = common.GetLocalIp()
-	extension.SetRegistry(Name, newKubernetesRegistry)
-}
-
-type kubernetesRegistry struct {
-	registry.BaseRegistry
-	cltLock        sync.RWMutex
-	client         *kubernetes.Client
-	listenerLock   sync.Mutex
-	listener       *kubernetes.EventListener
-	dataListener   *dataListener
-	configListener *configurationListener
-}
-
-// Client gets the etcdv3 kubernetes
-func (r *kubernetesRegistry) Client() *kubernetes.Client {
-	r.cltLock.RLock()
-	client := r.client
-	r.cltLock.RUnlock()
-	return client
-}
-
-// SetClient sets the kubernetes client
-func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) {
-	r.cltLock.Lock()
-	r.client = client
-	r.cltLock.Unlock()
-}
-
-// CloseAndNilClient closes listeners and clear client
-func (r *kubernetesRegistry) CloseAndNilClient() {
-	r.client.Close()
-	r.client = nil
-}
-
-// CloseListener closes listeners
-func (r *kubernetesRegistry) CloseListener() {
-	r.cltLock.Lock()
-	l := r.configListener
-	r.cltLock.Unlock()
-	if l != nil {
-		l.Close()
-	}
-	r.configListener = nil
-}
-
-// CreatePath create the path in the registry center of kubernetes
-func (r *kubernetesRegistry) CreatePath(k string) error {
-	if err := r.client.Create(k, ""); err != nil {
-		return perrors.WithMessagef(err, "create path %s in kubernetes", k)
-	}
-	return nil
-}
-
-// DoRegister actually do the register job in the registry center of kubernetes
-func (r *kubernetesRegistry) DoRegister(root string, node string) error {
-	return r.client.Create(path.Join(root, node), "")
-}
-
-func (r *kubernetesRegistry) DoUnregister(root string, node string) error {
-	return perrors.New("DoUnregister is not support in kubernetesRegistry")
-}
-
-// DoSubscribe actually subscribe the provider URL
-func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
-	var configListener *configurationListener
-
-	r.listenerLock.Lock()
-	configListener = r.configListener
-	r.listenerLock.Unlock()
-	if r.listener == nil {
-		r.cltLock.Lock()
-		client := r.client
-		r.cltLock.Unlock()
-		if client == nil {
-			return nil, perrors.New("kubernetes client broken")
-		}
-
-		r.listenerLock.Lock()
-		if r.listener == nil {
-			// double check
-			r.listener = kubernetes.NewEventListener(r.client)
-		}
-		r.listenerLock.Unlock()
-	}
-
-	// register the svc to dataListener
-	r.dataListener.AddInterestedURL(svc)
-	go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()), r.dataListener)
-
-	return configListener, nil
-}
-
-// nolint
-func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) {
-	return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry")
-}
-
-// InitListeners init listeners of kubernetes registry center
-func (r *kubernetesRegistry) InitListeners() {
-	r.listener = kubernetes.NewEventListener(r.client)
-	r.configListener = NewConfigurationListener(r)
-	r.dataListener = NewRegistryDataListener(r.configListener)
-}
-
-func newKubernetesRegistry(url *common.URL) (registry.Registry, error) {
-	// actually, kubernetes use in-cluster config,
-	r := &kubernetesRegistry{}
-
-	r.InitBaseRegistry(url, r)
-
-	if err := kubernetes.ValidateClient(r); err != nil {
-		return nil, perrors.WithStack(err)
-	}
-
-	go r.HandleClientRestart()
-	r.InitListeners()
-
-	logger.Debugf("kubernetes registry started")
-
-	return r, nil
-}
-
-func newMockKubernetesRegistry(
-	url *common.URL,
-	podsList *v1.PodList,
-) (registry.Registry, error) {
-
-	var err error
-
-	r := &kubernetesRegistry{}
-
-	r.InitBaseRegistry(url, r)
-	r.client, err = kubernetes.NewMockClient(podsList)
-	if err != nil {
-		return nil, perrors.WithMessage(err, "new mock client")
-	}
-	r.InitListeners()
-	return r, nil
-}
-
-// HandleClientRestart will reconnect to  kubernetes registry center
-func (r *kubernetesRegistry) HandleClientRestart() {
-	r.WaitGroup().Add(1)
-	defer r.WaitGroup().Done()
-	var (
-		err       error
-		failTimes int
-	)
-LOOP:
-	for {
-		select {
-		case <-r.Done():
-			logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes goroutine exit now...")
-			break LOOP
-			// re-register all services
-		case <-r.Client().Done():
-			r.Client().Close()
-			r.SetClient(nil)
-
-			// try to connect to kubernetes,
-			failTimes = 0
-			for {
-				after := gxtime.After(timeSecondDuration(failTimes * ConnDelay))
-				select {
-				case <-r.Done():
-					logger.Warnf("(KubernetesProviderRegistry)reconnectKubernetes Registry goroutine exit now...")
-					break LOOP
-				case <-after: // avoid connect frequent
-				}
-				err = kubernetes.ValidateClient(r)
-				logger.Infof("Kubernetes ProviderRegistry.validateKubernetesClient = error{%#v}", perrors.WithStack(err))
-
-				if err == nil {
-					if r.RestartCallBack() {
-						break
-					}
-				}
-				failTimes++
-				if MaxFailTimes <= failTimes {
-					failTimes = MaxFailTimes
-				}
-			}
-		}
-	}
-}
-
-func timeSecondDuration(sec int) time.Duration {
-	return time.Duration(sec) * time.Second
-}
diff --git a/registry/kubernetes/registry_test.go b/registry/kubernetes/registry_test.go
deleted file mode 100644
index 491ff94..0000000
--- a/registry/kubernetes/registry_test.go
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"encoding/json"
-	"os"
-	"strconv"
-	"testing"
-	"time"
-)
-
-import (
-	"github.com/stretchr/testify/assert"
-
-	v1 "k8s.io/api/core/v1"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/constant"
-)
-
-var clientPodListJsonData = `{
-    "apiVersion": "v1",
-    "items": [
-        {
-            "apiVersion": "v1",
-            "kind": "Pod",
-            "metadata": {
-                "annotations": {
-                    "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5s [...]
-                },
-                "creationTimestamp": "2020-06-03T03:49:14Z",
-                "generateName": "server-84c864f5bc-",
-                "labels": {
-                    "dubbo.io/label": "dubbo.io-value",
-                    "pod-template-hash": "84c864f5bc",
-                    "role": "server"
-                },
-                "name": "server-84c864f5bc-r8qvz",
-                "namespace": "default",
-                "ownerReferences": [
-                    {
-                        "apiVersion": "apps/v1",
-                        "blockOwnerDeletion": true,
-                        "controller": true,
-                        "kind": "ReplicaSet",
-                        "name": "server-84c864f5bc",
-                        "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
-                    }
-                ],
-                "resourceVersion": "517460",
-                "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
-                "uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
-            },
-            "spec": {
-                "containers": [
-                    {
-                        "env": [
-                            {
-                                "name": "DUBBO_NAMESPACE",
-                                "value": "default"
-                            },
-                            {
-                                "name": "NAMESPACE",
-                                "valueFrom": {
-                                    "fieldRef": {
-                                        "apiVersion": "v1",
-                                        "fieldPath": "metadata.namespace"
-                                    }
-                                }
-                            }
-                        ],
-                        "image": "192.168.240.101:5000/scott/go-server",
-                        "imagePullPolicy": "Always",
-                        "name": "server",
-                        "resources": {},
-                        "terminationMessagePath": "/dev/termination-log",
-                        "terminationMessagePolicy": "File",
-                        "volumeMounts": [
-                            {
-                                "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
-                                "name": "dubbo-sa-token-5qbtb",
-                                "readOnly": true
-                            }
-                        ]
-                    }
-                ],
-                "dnsPolicy": "ClusterFirst",
-                "enableServiceLinks": true,
-                "nodeName": "minikube",
-                "priority": 0,
-                "restartPolicy": "Always",
-                "schedulerName": "default-scheduler",
-                "securityContext": {},
-                "serviceAccount": "dubbo-sa",
-                "serviceAccountName": "dubbo-sa",
-                "terminationGracePeriodSeconds": 30,
-                "tolerations": [
-                    {
-                        "effect": "NoExecute",
-                        "key": "node.kubernetes.io/not-ready",
-                        "operator": "Exists",
-                        "tolerationSeconds": 300
-                    },
-                    {
-                        "effect": "NoExecute",
-                        "key": "node.kubernetes.io/unreachable",
-                        "operator": "Exists",
-                        "tolerationSeconds": 300
-                    }
-                ],
-                "volumes": [
-                    {
-                        "name": "dubbo-sa-token-5qbtb",
-                        "secret": {
-                            "defaultMode": 420,
-                            "secretName": "dubbo-sa-token-5qbtb"
-                        }
-                    }
-                ]
-            },
-            "status": {
-                "conditions": [
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:14Z",
-                        "status": "True",
-                        "type": "Initialized"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:15Z",
-                        "status": "True",
-                        "type": "Ready"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:15Z",
-                        "status": "True",
-                        "type": "ContainersReady"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:14Z",
-                        "status": "True",
-                        "type": "PodScheduled"
-                    }
-                ],
-                "containerStatuses": [
-                    {
-                        "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
-                        "image": "192.168.240.101:5000/scott/go-server:latest",
-                        "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
-                        "lastState": {},
-                        "name": "server",
-                        "ready": true,
-                        "restartCount": 0,
-                        "started": true,
-                        "state": {
-                            "running": {
-                                "startedAt": "2020-06-03T03:49:15Z"
-                            }
-                        }
-                    }
-                ],
-                "hostIP": "10.0.2.15",
-                "phase": "Running",
-                "podIP": "172.17.0.6",
-                "podIPs": [
-                    {
-                        "ip": "172.17.0.6"
-                    }
-                ],
-                "qosClass": "BestEffort",
-                "startTime": "2020-06-03T03:49:14Z"
-            }
-        }
-    ],
-    "kind": "List",
-    "metadata": {
-        "resourceVersion": "",
-        "selfLink": ""
-    }
-}
-`
-
-func getTestRegistry(t *testing.T) *kubernetesRegistry {
-	const (
-		podNameKey              = "HOSTNAME"
-		nameSpaceKey            = "NAMESPACE"
-		needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
-	)
-	pl := &v1.PodList{}
-	// 1. install test data
-	if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
-		t.Fatal(err)
-	}
-	currentPod := pl.Items[0]
-
-	env := map[string]string{
-		nameSpaceKey:            currentPod.GetNamespace(),
-		podNameKey:              currentPod.GetName(),
-		needWatchedNameSpaceKey: "default",
-	}
-
-	for k, v := range env {
-		if err := os.Setenv(k, v); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	regurl, err := common.NewURL("registry://127.0.0.1:443", common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER)))
-	if err != nil {
-		t.Fatal(err)
-	}
-	out, err := newMockKubernetesRegistry(regurl, pl)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	return out.(*kubernetesRegistry)
-}
-
-func TestRegister(t *testing.T) {
-	r := getTestRegistry(t)
-	defer r.Destroy()
-
-	url, _ := common.NewURL(
-		"dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
-		common.WithParamsValue(constant.ClusterKey, "mock"),
-		common.WithMethods([]string{"GetUser", "AddUser"}),
-	)
-
-	err := r.Register(url)
-	assert.NoError(t, err)
-	_, _, err = r.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers")
-	if err != nil {
-		t.Fatal(err)
-	}
-}
-
-//
-//func TestSubscribe(t *testing.T) {
-//	r := getTestRegistry(t)
-//	defer r.Destroy()
-//
-//	url, err := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.ClusterKey, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
-//	if err != nil {
-//		t.Fatal(err)
-//	}
-//
-//	listener, err := r.DoSubscribe(url)
-//	if err != nil {
-//		t.Fatal(err)
-//	}
-//
-//	wg := sync.WaitGroup{}
-//	wg.Add(1)
-//	go func() {
-//		defer wg.Done()
-//		registerErr := r.Register(url)
-//		if registerErr != nil {
-//			t.Error(registerErr)
-//		}
-//	}()
-//
-//	wg.Wait()
-//
-//	serviceEvent, err := listener.Next()
-//	if err != nil {
-//		t.Fatal(err)
-//	}
-//	t.Logf("get service event %s", serviceEvent)
-//}
-
-func TestConsumerDestroy(t *testing.T) {
-	r := getTestRegistry(t)
-
-	url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
-		common.WithParamsValue(constant.ClusterKey, "mock"),
-		common.WithMethods([]string{"GetUser", "AddUser"}))
-
-	_, err := r.DoSubscribe(url)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	// listener.Close()
-	time.Sleep(1e9)
-	r.Destroy()
-
-	assert.Equal(t, false, r.IsAvailable())
-}
-
-func TestProviderDestroy(t *testing.T) {
-	r := getTestRegistry(t)
-
-	url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider",
-		common.WithParamsValue(constant.ClusterKey, "mock"),
-		common.WithMethods([]string{"GetUser", "AddUser"}))
-	err := r.Register(url)
-	assert.NoError(t, err)
-
-	time.Sleep(1e9)
-	r.Destroy()
-	assert.Equal(t, false, r.IsAvailable())
-}
-
-func TestNewRegistry(t *testing.T) {
-	regUrl, err := common.NewURL("registry://127.0.0.1:443",
-		common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER)))
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, err = newKubernetesRegistry(regUrl)
-	if err == nil {
-		t.Fatal("not in cluster, should be a err")
-	}
-}
-
-func TestHandleClientRestart(t *testing.T) {
-	r := getTestRegistry(t)
-	r.WaitGroup().Add(1)
-	go r.HandleClientRestart()
-	time.Sleep(timeSecondDuration(1))
-	r.client.Close()
-}
diff --git a/remoting/kubernetes/client.go b/remoting/kubernetes/client.go
deleted file mode 100644
index fe161bf..0000000
--- a/remoting/kubernetes/client.go
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"context"
-	"strconv"
-	"sync"
-)
-
-import (
-	perrors "github.com/pkg/errors"
-
-	v1 "k8s.io/api/core/v1"
-
-	"k8s.io/client-go/kubernetes"
-	"k8s.io/client-go/kubernetes/fake"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/constant"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-)
-
-type Client struct {
-	lock sync.RWMutex
-
-	// manage the  client lifecycle
-	ctx    context.Context
-	cancel context.CancelFunc
-
-	controller *dubboRegistryController
-}
-
-// NewClient returns Client instance for registry
-func NewClient(url *common.URL) (*Client, error) {
-	// read type
-	r, err := strconv.Atoi(url.GetParams().Get(constant.RegistryRoleKey))
-	if err != nil {
-		return nil, perrors.WithMessage(err, "atoi role")
-	}
-	ctx, cancel := context.WithCancel(context.Background())
-
-	controller, err := newDubboRegistryController(ctx, common.RoleType(r), GetInClusterKubernetesClient)
-	if err != nil {
-		cancel()
-		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
-	}
-
-	c := &Client{
-		ctx:        ctx,
-		cancel:     cancel,
-		controller: controller,
-	}
-
-	if r == common.CONSUMER {
-		// only consumer have to start informer factory
-		c.controller.startALLInformers()
-	}
-	return c, nil
-}
-
-func (c *Client) SetLabel(k, v string) error {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	if err := c.controller.assembleLabel(k, v); err != nil {
-		return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
-	}
-
-	logger.Debugf("put the @key = %s @value = %s success", k, v)
-	return nil
-}
-
-// Create creates k/v pair in watcher-set
-func (c *Client) Create(k, v string) error {
-	// the read current pod must be lock, protect every
-	// create operation can be atomic
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	if err := c.controller.addAnnotationForCurrentPod(k, v); err != nil {
-		return perrors.WithMessagef(err, "add annotation @key = %s @value = %s", k, v)
-	}
-
-	logger.Debugf("put the @key = %s @value = %s success", k, v)
-	return nil
-}
-
-// GetChildren gets k children list from kubernetes-watcherSet
-func (c *Client) GetChildren(k string) ([]string, []string, error) {
-	objectList, err := c.controller.watcherSet.Get(k, true)
-	if err != nil {
-		return nil, nil, perrors.WithMessagef(err, "get children from watcherSet on (%s)", k)
-	}
-
-	var kList []string
-	var vList []string
-
-	for _, o := range objectList {
-		kList = append(kList, o.Key)
-		vList = append(vList, o.Value)
-	}
-
-	return kList, vList, nil
-}
-
-// Watch watches on spec key
-func (c *Client) Watch(k string) (<-chan *WatcherEvent, <-chan struct{}, error) {
-	w, err := c.controller.watcherSet.Watch(k, false)
-	if err != nil {
-		return nil, nil, perrors.WithMessagef(err, "watch on (%s)", k)
-	}
-
-	return w.ResultChan(), w.done(), nil
-}
-
-// WatchWithPrefix watches on spec prefix
-func (c *Client) WatchWithPrefix(prefix string) (<-chan *WatcherEvent, <-chan struct{}, error) {
-	w, err := c.controller.watcherSet.Watch(prefix, true)
-	if err != nil {
-		return nil, nil, perrors.WithMessagef(err, "watch on prefix (%s)", prefix)
-	}
-
-	return w.ResultChan(), w.done(), nil
-}
-
-// if returns false, the client is die
-func (c *Client) Valid() bool {
-	select {
-	case <-c.Done():
-		return false
-	default:
-	}
-	c.lock.RLock()
-	defer c.lock.RUnlock()
-	return c.controller != nil
-}
-
-// nolint
-func (c *Client) Done() <-chan struct{} {
-	return c.ctx.Done()
-}
-
-// nolint
-func (c *Client) Close() {
-	select {
-	case <-c.ctx.Done():
-		// already stopped
-		return
-	default:
-	}
-	c.cancel()
-
-	// the client ctx be canceled
-	// will trigger the watcherSet watchers all stopped
-	// so, just wait
-}
-
-// ValidateClient validates the kubernetes client
-func ValidateClient(container clientFacade) error {
-	client := container.Client()
-
-	// new Client
-	if client == nil || client.Valid() {
-
-		newClient, err := NewClient(container.GetURL())
-		if err != nil {
-			logger.Warnf("new kubernetes client: %v)", err)
-			return perrors.WithMessage(err, "new kubernetes client")
-		}
-		container.SetClient(newClient)
-	}
-
-	return nil
-}
-
-// NewMockClient exports for registry package test
-func NewMockClient(podList *v1.PodList) (*Client, error) {
-	ctx, cancel := context.WithCancel(context.Background())
-	controller, err := newDubboRegistryController(ctx, common.CONSUMER, func() (kubernetes.Interface, error) {
-		return fake.NewSimpleClientset(podList), nil
-	})
-	if err != nil {
-		cancel()
-		return nil, perrors.WithMessage(err, "new dubbo-registry controller")
-	}
-
-	c := &Client{
-		ctx:        ctx,
-		cancel:     cancel,
-		controller: controller,
-	}
-
-	c.controller.startALLInformers()
-	return c, nil
-}
diff --git a/remoting/kubernetes/client_test.go b/remoting/kubernetes/client_test.go
deleted file mode 100644
index be72561..0000000
--- a/remoting/kubernetes/client_test.go
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"encoding/json"
-	"fmt"
-	_ "net/http/pprof"
-	"os"
-	"strings"
-	"sync"
-	"testing"
-)
-
-import (
-	v1 "k8s.io/api/core/v1"
-)
-
-// tests dataset
-var tests = []struct {
-	input struct {
-		k string
-		v string
-	}
-}{
-	{input: struct {
-		k string
-		v string
-	}{k: "name", v: "scott.wang"}},
-	{input: struct {
-		k string
-		v string
-	}{k: "namePrefix", v: "prefix.scott.wang"}},
-	{input: struct {
-		k string
-		v string
-	}{k: "namePrefix1", v: "prefix1.scott.wang"}},
-	{input: struct {
-		k string
-		v string
-	}{k: "age", v: "27"}},
-}
-
-// test dataset prefix
-const prefix = "name"
-
-var watcherStopLog = "the watcherSet watcher was stopped"
-
-var clientPodListJsonData = `{
-    "apiVersion": "v1",
-    "items": [
-        {
-            "apiVersion": "v1",
-            "kind": "Pod",
-            "metadata": {
-                "annotations": {
-                    "dubbo.io/annotation": "W3siayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzIiwidiI6IiJ9LHsiayI6Ii9kdWJiby9jb20uaWt1cmVudG8udXNlci5Vc2VyUHJvdmlkZXIvcHJvdmlkZXJzL2R1YmJvJTNBJTJGJTJGMTcyLjE3LjAuNiUzQTIwMDAwJTJGVXNlclByb3ZpZGVyJTNGYWNjZXNzbG9nJTNEJTI2YW55aG9zdCUzRHRydWUlMjZhcHAudmVyc2lvbiUzRDAuMC4xJTI2YXBwbGljYXRpb24lM0RCRFRTZXJ2aWNlJTI2YXV0aCUzRCUyNmJlYW4ubmFtZSUzRFVzZXJQcm92aWRlciUyNmNsdXN0ZXIlM0RmYWlsb3ZlciUyNmVudmlyb25tZW50JTNEZGV2JTI2ZXhlY3V0ZS5s [...]
-                },
-                "creationTimestamp": "2020-06-03T03:49:14Z",
-                "generateName": "server-84c864f5bc-",
-                "labels": {
-                    "dubbo.io/label": "dubbo.io-value",
-                    "pod-template-hash": "84c864f5bc",
-                    "role": "server"
-                },
-                "name": "server-84c864f5bc-r8qvz",
-                "namespace": "default",
-                "ownerReferences": [
-                    {
-                        "apiVersion": "apps/v1",
-                        "blockOwnerDeletion": true,
-                        "controller": true,
-                        "kind": "ReplicaSet",
-                        "name": "server-84c864f5bc",
-                        "uid": "fa376dbb-4f37-4705-8e80-727f592c19b3"
-                    }
-                ],
-                "resourceVersion": "517460",
-                "selfLink": "/api/v1/namespaces/default/pods/server-84c864f5bc-r8qvz",
-                "uid": "f4fc811c-200c-4445-8d4f-532144957dcc"
-            },
-            "spec": {
-                "containers": [
-                    {
-                        "env": [
-                            {
-                                "name": "DUBBO_NAMESPACE",
-                                "value": "default"
-                            },
-                            {
-                                "name": "NAMESPACE",
-                                "valueFrom": {
-                                    "fieldRef": {
-                                        "apiVersion": "v1",
-                                        "fieldPath": "metadata.namespace"
-                                    }
-                                }
-                            }
-                        ],
-                        "image": "192.168.240.101:5000/scott/go-server",
-                        "imagePullPolicy": "Always",
-                        "name": "server",
-                        "resources": {},
-                        "terminationMessagePath": "/dev/termination-log",
-                        "terminationMessagePolicy": "File",
-                        "volumeMounts": [
-                            {
-                                "mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
-                                "name": "dubbo-sa-token-5qbtb",
-                                "readOnly": true
-                            }
-                        ]
-                    }
-                ],
-                "dnsPolicy": "ClusterFirst",
-                "enableServiceLinks": true,
-                "nodeName": "minikube",
-                "priority": 0,
-                "restartPolicy": "Always",
-                "schedulerName": "default-scheduler",
-                "securityContext": {},
-                "serviceAccount": "dubbo-sa",
-                "serviceAccountName": "dubbo-sa",
-                "terminationGracePeriodSeconds": 30,
-                "tolerations": [
-                    {
-                        "effect": "NoExecute",
-                        "key": "node.kubernetes.io/not-ready",
-                        "operator": "Exists",
-                        "tolerationSeconds": 300
-                    },
-                    {
-                        "effect": "NoExecute",
-                        "key": "node.kubernetes.io/unreachable",
-                        "operator": "Exists",
-                        "tolerationSeconds": 300
-                    }
-                ],
-                "volumes": [
-                    {
-                        "name": "dubbo-sa-token-5qbtb",
-                        "secret": {
-                            "defaultMode": 420,
-                            "secretName": "dubbo-sa-token-5qbtb"
-                        }
-                    }
-                ]
-            },
-            "status": {
-                "conditions": [
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:14Z",
-                        "status": "True",
-                        "type": "Initialized"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:15Z",
-                        "status": "True",
-                        "type": "Ready"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:15Z",
-                        "status": "True",
-                        "type": "ContainersReady"
-                    },
-                    {
-                        "lastProbeTime": null,
-                        "lastTransitionTime": "2020-06-03T03:49:14Z",
-                        "status": "True",
-                        "type": "PodScheduled"
-                    }
-                ],
-                "containerStatuses": [
-                    {
-                        "containerID": "docker://b6421e05ce44f8a1c4fa6b72274980777c7c0f945516209f7c0558cd0cd65406",
-                        "image": "192.168.240.101:5000/scott/go-server:latest",
-                        "imageID": "docker-pullable://192.168.240.101:5000/scott/go-server@sha256:4eecf895054f0ff93d80db64992a561d10504e55582def6dcb6093a6d6d92461",
-                        "lastState": {},
-                        "name": "server",
-                        "ready": true,
-                        "restartCount": 0,
-                        "started": true,
-                        "state": {
-                            "running": {
-                                "startedAt": "2020-06-03T03:49:15Z"
-                            }
-                        }
-                    }
-                ],
-                "hostIP": "10.0.2.15",
-                "phase": "Running",
-                "podIP": "172.17.0.6",
-                "podIPs": [
-                    {
-                        "ip": "172.17.0.6"
-                    }
-                ],
-                "qosClass": "BestEffort",
-                "startTime": "2020-06-03T03:49:14Z"
-            }
-        }
-    ],
-    "kind": "List",
-    "metadata": {
-        "resourceVersion": "",
-        "selfLink": ""
-    }
-}
-`
-
-func getTestClient(t *testing.T) *Client {
-	pl := &v1.PodList{}
-	// 1. install test data
-	if err := json.Unmarshal([]byte(clientPodListJsonData), &pl); err != nil {
-		t.Fatal(err)
-	}
-	currentPod := pl.Items[0]
-
-	env := map[string]string{
-		nameSpaceKey:            currentPod.GetNamespace(),
-		podNameKey:              currentPod.GetName(),
-		needWatchedNameSpaceKey: "default",
-	}
-
-	for k, v := range env {
-		if err := os.Setenv(k, v); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	client, err := NewMockClient(pl)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	return client
-}
-
-func TestClientValid(t *testing.T) {
-	client := getTestClient(t)
-	defer client.Close()
-
-	if !client.Valid() {
-		t.Fatal("client is not valid")
-	}
-
-	client.Close()
-	if client.Valid() {
-		t.Fatal("client is valid")
-	}
-}
-
-func TestClientDone(t *testing.T) {
-	client := getTestClient(t)
-
-	go func() {
-		client.Close()
-	}()
-
-	<-client.Done()
-
-	if client.Valid() {
-		t.Fatal("client should be invalid")
-	}
-}
-
-func TestClientCreateKV(t *testing.T) {
-	client := getTestClient(t)
-	defer client.Close()
-
-	for _, tc := range tests {
-
-		k := tc.input.k
-		v := tc.input.v
-
-		if err := client.Create(k, v); err != nil {
-			t.Fatal(err)
-		}
-
-	}
-}
-
-func TestClientGetChildrenKVList(t *testing.T) {
-	client := getTestClient(t)
-	defer client.Close()
-
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	syncDataComplete := make(chan struct{})
-
-	go func() {
-		wc, done, err := client.WatchWithPrefix(prefix)
-		if err != nil {
-			t.Error(err)
-			return
-		}
-
-		wg.Done()
-		i := 0
-
-		for {
-			select {
-			case e := <-wc:
-				i++
-				fmt.Printf("got event %v k %s v %s\n", e.EventType, e.Key, e.Value)
-				if i == 3 {
-					// already sync all event
-					syncDataComplete <- struct{}{}
-					return
-				}
-			case <-done:
-				t.Log(watcherStopLog)
-				return
-			}
-		}
-	}()
-
-	// wait the watch goroutine start
-	wg.Wait()
-
-	expect := make(map[string]string)
-	got := make(map[string]string)
-
-	for _, tc := range tests {
-
-		k := tc.input.k
-		v := tc.input.v
-
-		if strings.Contains(k, prefix) {
-			expect[k] = v
-		}
-
-		if err := client.Create(k, v); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	<-syncDataComplete
-
-	// start get all children
-	kList, vList, err := client.GetChildren(prefix)
-	if err != nil {
-		t.Error(err)
-	}
-
-	for i := 0; i < len(kList); i++ {
-		got[kList[i]] = vList[i]
-	}
-
-	for expectK, expectV := range expect {
-		if got[expectK] != expectV {
-			t.Fatalf("expect {%s: %s} but got {%s: %v}", expectK, expectV, expectK, got[expectK])
-		}
-	}
-}
-
-func TestClientWatchPrefix(t *testing.T) {
-	client := getTestClient(t)
-
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	go func() {
-		wc, done, err := client.WatchWithPrefix(prefix)
-		if err != nil {
-			t.Error(err)
-		}
-
-		wg.Done()
-
-		for {
-			select {
-			case e := <-wc:
-				t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
-			case <-done:
-				t.Log(watcherStopLog)
-				return
-			}
-		}
-	}()
-
-	// must wait the watch goroutine work
-	wg.Wait()
-
-	for _, tc := range tests {
-
-		k := tc.input.k
-		v := tc.input.v
-
-		if err := client.Create(k, v); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	client.Close()
-}
-
-func TestClientWatch(t *testing.T) {
-	client := getTestClient(t)
-
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-
-	go func() {
-		wc, done, err := client.Watch(prefix)
-		if err != nil {
-			t.Error(err)
-		}
-		wg.Done()
-
-		for {
-			select {
-			case e := <-wc:
-				t.Logf("got event %v k %s v %s", e.EventType, e.Key, e.Value)
-			case <-done:
-				t.Log(watcherStopLog)
-				return
-			}
-		}
-	}()
-
-	// must wait the watch goroutine already start the watch goroutine
-	wg.Wait()
-
-	for _, tc := range tests {
-
-		k := tc.input.k
-		v := tc.input.v
-
-		if err := client.Create(k, v); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	client.Close()
-}
diff --git a/remoting/kubernetes/facade.go b/remoting/kubernetes/facade.go
deleted file mode 100644
index 63115b0..0000000
--- a/remoting/kubernetes/facade.go
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-)
-
-type clientFacade interface {
-	Client() *Client
-	SetClient(*Client)
-	common.Node
-}
diff --git a/remoting/kubernetes/facade_test.go b/remoting/kubernetes/facade_test.go
deleted file mode 100644
index 72ddb06..0000000
--- a/remoting/kubernetes/facade_test.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"strconv"
-	"testing"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/constant"
-)
-
-type mockFacade struct {
-	*common.URL
-	client *Client
-	// cltLock sync.Mutex
-	// done    chan struct{}
-}
-
-func (r *mockFacade) Client() *Client {
-	return r.client
-}
-
-func (r *mockFacade) SetClient(client *Client) {
-	r.client = client
-}
-
-func (r *mockFacade) GetURL() *common.URL {
-	return r.URL
-}
-
-func (r *mockFacade) Destroy() {
-	// TODO implementation me
-}
-
-func (r *mockFacade) RestartCallBack() bool {
-	return true
-}
-
-func (r *mockFacade) IsAvailable() bool {
-	return true
-}
-
-func Test_Facade(t *testing.T) {
-	regUrl, err := common.NewURL("registry://127.0.0.1:443",
-		common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.CONSUMER)))
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	mockClient := getTestClient(t)
-	m := &mockFacade{
-		URL:    regUrl,
-		client: mockClient,
-	}
-
-	if err := ValidateClient(m); err == nil {
-		t.Fatal("out of cluster should err")
-	}
-	mockClient.Close()
-}
diff --git a/remoting/kubernetes/listener.go b/remoting/kubernetes/listener.go
deleted file mode 100644
index 9a8e0dc..0000000
--- a/remoting/kubernetes/listener.go
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"sync"
-)
-
-import (
-	perrors "github.com/pkg/errors"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-	"dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-type EventListener struct {
-	client     *Client
-	keyMapLock sync.RWMutex
-	keyMap     map[string]struct{}
-	wg         sync.WaitGroup
-}
-
-func NewEventListener(client *Client) *EventListener {
-	return &EventListener{
-		client: client,
-		keyMap: make(map[string]struct{}, 8),
-	}
-}
-
-// Listen on a spec key
-// this method returns true when spec key deleted,
-// this method returns false when deep layer connection lose
-func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
-	defer l.wg.Done()
-	for {
-		wc, done, err := l.client.Watch(key)
-		if err != nil {
-			logger.Warnf("watch exist{key:%s} = error{%v}", key, err)
-			return false
-		}
-
-		select {
-
-		// client stopped
-		case <-l.client.Done():
-			logger.Warnf("kubernetes client stopped")
-			return false
-
-		// watcherSet watcher stopped
-		case <-done:
-			logger.Warnf("kubernetes watcherSet watcher stopped")
-			return false
-
-		// handle kubernetes-watcherSet events
-		case e, ok := <-wc:
-			if !ok {
-				logger.Warnf("kubernetes-watcherSet watch-chan closed")
-				return false
-			}
-
-			if l.handleEvents(e, listener...) {
-				// if event is delete
-				return true
-			}
-		}
-	}
-}
-
-// return true means the event type is DELETE
-// return false means the event type is CREATE || UPDATE
-func (l *EventListener) handleEvents(event *WatcherEvent, listeners ...remoting.DataListener) bool {
-	logger.Infof("got a kubernetes-watcherSet event {type: %d, key: %s}", event.EventType, event.Key)
-
-	switch event.EventType {
-	case Create:
-		for _, listener := range listeners {
-			logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataCreated}", event.Key)
-			listener.DataChange(remoting.Event{
-				Path:    string(event.Key),
-				Action:  remoting.EventTypeAdd,
-				Content: string(event.Value),
-			})
-		}
-		return false
-	case Update:
-		for _, listener := range listeners {
-			logger.Infof("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDataChanged}", event.Key)
-			listener.DataChange(remoting.Event{
-				Path:    string(event.Key),
-				Action:  remoting.EventTypeUpdate,
-				Content: string(event.Value),
-			})
-		}
-		return false
-	case Delete:
-		logger.Warnf("kubernetes-watcherSet get event (key{%s}) = event{EventNodeDeleted}", event.Key)
-		return true
-	default:
-		return false
-	}
-}
-
-// Listen on a set of key with spec prefix
-func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
-	defer l.wg.Done()
-	for {
-		wc, done, err := l.client.WatchWithPrefix(prefix)
-		if err != nil {
-			logger.Warnf("listenDirEvent(key{%s}) = error{%v}", prefix, err)
-		}
-
-		select {
-		// client stopped
-		case <-l.client.Done():
-			logger.Warnf("kubernetes client stopped")
-			return
-
-		// watcher stopped
-		case <-done:
-			logger.Warnf("kubernetes watcherSet watcher stopped")
-			return
-
-			// kuberentes-watcherSet event stream
-		case e, ok := <-wc:
-
-			if !ok {
-				logger.Warnf("kubernetes-watcherSet watch-chan closed")
-				return
-			}
-
-			l.handleEvents(e, listener...)
-		}
-	}
-}
-
-// this func is invoked by kubernetes ConsumerRegistry::Registry/ kubernetes ConsumerRegistry::get/kubernetes ConsumerRegistry::getListener
-// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
-//                            |
-//                            --------> ListenServiceNodeEvent
-func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) {
-	l.keyMapLock.RLock()
-	_, ok := l.keyMap[key]
-	l.keyMapLock.RUnlock()
-	if ok {
-		logger.Warnf("kubernetes-watcherSet key %s has already been listened.", key)
-		return
-	}
-
-	l.keyMapLock.Lock()
-	// double check
-	if _, ok := l.keyMap[key]; ok {
-		// another goroutine already set it
-		l.keyMapLock.Unlock()
-		return
-	}
-	l.keyMap[key] = struct{}{}
-	l.keyMapLock.Unlock()
-
-	keyList, valueList, err := l.client.GetChildren(key)
-	if err != nil {
-		logger.Warnf("Get new node path {%v} 's content error,message is  {%v}", key, perrors.WithMessage(err, "get children"))
-	}
-
-	logger.Infof("get key children list %s, keys %v values %v", key, keyList, valueList)
-
-	for i, k := range keyList {
-		logger.Infof("got children list key -> %s", k)
-		listener.DataChange(remoting.Event{
-			Path:    k,
-			Action:  remoting.EventTypeAdd,
-			Content: valueList[i],
-		})
-	}
-
-	logger.Infof("listen dubbo provider key{%s} event and wait to get all provider from kubernetes-watcherSet", key)
-
-	l.wg.Add(1)
-	go func(key string, listener remoting.DataListener) {
-		l.ListenServiceNodeEventWithPrefix(key, listener)
-		logger.Warnf("listenDirEvent(key{%s}) goroutine exit now", key)
-	}(key, listener)
-
-	logger.Infof("listen dubbo service key{%s}", key)
-	l.wg.Add(1)
-	go func(key string) {
-		if l.ListenServiceNodeEvent(key) {
-			listener.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeDel})
-		}
-		logger.Warnf("listenSelf(kubernetes key{%s}) goroutine exit now", key)
-	}(key)
-}
-
-func (l *EventListener) Close() {
-	l.wg.Wait()
-}
diff --git a/remoting/kubernetes/listener_test.go b/remoting/kubernetes/listener_test.go
deleted file mode 100644
index ffe58cb..0000000
--- a/remoting/kubernetes/listener_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"testing"
-	"time"
-)
-
-import (
-	"github.com/stretchr/testify/assert"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/remoting"
-)
-
-var changedData = `
-	dubbo.consumer.request_timeout=3s
-	dubbo.consumer.connect_timeout=5s
-	dubbo.application.organization=ikurento.com
-	dubbo.application.name=BDTService
-	dubbo.application.module=dubbogo user-info server
-	dubbo.application.version=0.0.1
-	dubbo.application.owner=ZX
-	dubbo.application.environment=dev
-	dubbo.registries.hangzhouzk.protocol=zookeeper
-	dubbo.registries.hangzhouzk.timeout=3s
-	dubbo.registries.hangzhouzk.address=127.0.0.1:2181
-	dubbo.registries.shanghaizk.protocol=zookeeper
-	dubbo.registries.shanghaizk.timeout=3s
-	dubbo.registries.shanghaizk.address=127.0.0.1:2182
-	dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
-	dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
-	dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
-	dubbo.service.com.ikurento.user.UserProvider.warmup=100
-	dubbo.service.com.ikurento.user.UserProvider.cluster=failover
-`
-
-type mockDataListener struct {
-	eventList   []remoting.Event
-	client      *Client
-	changedData string
-
-	rc chan remoting.Event
-}
-
-func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
-	m.eventList = append(m.eventList, eventType)
-	if eventType.Content == m.changedData {
-		m.rc <- eventType
-	}
-	return true
-}
-
-func TestListener(t *testing.T) {
-	tests := []struct {
-		input struct {
-			k string
-			v string
-		}
-	}{
-		{input: struct {
-			k string
-			v string
-		}{k: "/dubbo", v: changedData}},
-	}
-
-	c := getTestClient(t)
-	defer c.Close()
-
-	listener := NewEventListener(c)
-	dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
-	listener.ListenServiceEvent("/dubbo", dataListener)
-	time.Sleep(1e9)
-
-	for _, tc := range tests {
-
-		k := tc.input.k
-		v := tc.input.v
-		if err := c.Create(k, v); err != nil {
-			t.Fatal(err)
-		}
-
-	}
-	msg := <-dataListener.rc
-	assert.Equal(t, changedData, msg.Content)
-}
diff --git a/remoting/kubernetes/registry_controller.go b/remoting/kubernetes/registry_controller.go
deleted file mode 100644
index f6a804a..0000000
--- a/remoting/kubernetes/registry_controller.go
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"context"
-	"encoding/base64"
-	"encoding/json"
-	"fmt"
-	"io/ioutil"
-	"os"
-	"strconv"
-	"strings"
-	"sync"
-	"time"
-)
-
-import (
-	perrors "github.com/pkg/errors"
-
-	v1 "k8s.io/api/core/v1"
-
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-	"k8s.io/apimachinery/pkg/labels"
-	"k8s.io/apimachinery/pkg/selection"
-	"k8s.io/apimachinery/pkg/types"
-	"k8s.io/apimachinery/pkg/util/strategicpatch"
-	"k8s.io/apimachinery/pkg/watch"
-
-	"k8s.io/client-go/informers"
-	informerscorev1 "k8s.io/client-go/informers/core/v1"
-
-	"k8s.io/client-go/kubernetes"
-
-	"k8s.io/client-go/rest"
-
-	"k8s.io/client-go/tools/cache"
-
-	"k8s.io/client-go/util/workqueue"
-)
-
-import (
-	"dubbo.apache.org/dubbo-go/v3/common"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-)
-
-const (
-	// kubernetes inject env var
-	podNameKey              = "HOSTNAME"
-	nameSpaceKey            = "NAMESPACE"
-	nameSpaceFilePath       = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
-	needWatchedNameSpaceKey = "DUBBO_NAMESPACE"
-
-	// all pod annotation key
-	DubboIOAnnotationKey = "dubbo.io/annotation"
-	// all pod label key and value pair
-	DubboIOLabelKey           = "dubbo.io/label"
-	DubboIOConsumerLabelValue = "dubbo.io.consumer"
-	DubboIOProviderLabelValue = "dubbo.io.provider"
-
-	// kubernetes suggest resync
-	defaultResync = 5 * time.Minute
-)
-
-var ErrDubboLabelAlreadyExist = perrors.New("dubbo label already exist")
-
-// dubboRegistryController works like a kubernetes controller
-type dubboRegistryController struct {
-
-	// clone from client
-	// manage lifecycle
-	ctx context.Context
-
-	role common.RoleType
-
-	// protect patch current pod operation
-	lock sync.Mutex
-
-	// current pod config
-	needWatchedNamespace map[string]struct{}
-	namespace            string
-	name                 string
-
-	watcherSet WatcherSet
-
-	// kubernetes
-	kc                               kubernetes.Interface
-	listAndWatchStartResourceVersion uint64
-	namespacedInformerFactory        map[string]informers.SharedInformerFactory
-	namespacedPodInformers           map[string]informerscorev1.PodInformer
-	queue                            workqueue.Interface // shared by namespaced informers
-}
-
-func newDubboRegistryController(
-	ctx context.Context,
-	// different provider and consumer have behavior
-	roleType common.RoleType,
-	// used to inject mock kubernetes client
-	kcGetter func() (kubernetes.Interface, error),
-) (*dubboRegistryController, error) {
-
-	kc, err := kcGetter()
-	if err != nil {
-		return nil, perrors.WithMessage(err, "get kubernetes client")
-	}
-
-	c := &dubboRegistryController{
-		ctx:                       ctx,
-		role:                      roleType,
-		watcherSet:                newWatcherSet(ctx),
-		needWatchedNamespace:      make(map[string]struct{}),
-		namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
-		namespacedPodInformers:    make(map[string]informerscorev1.PodInformer),
-		kc:                        kc,
-	}
-
-	if err := c.readConfig(); err != nil {
-		return nil, perrors.WithMessage(err, "read config")
-	}
-
-	if err := c.initCurrentPod(); err != nil {
-		return nil, perrors.WithMessage(err, "init current pod")
-	}
-
-	if err := c.initWatchSet(); err != nil {
-		return nil, perrors.WithMessage(err, "init watch set")
-	}
-
-	if err := c.initPodInformer(); err != nil {
-		return nil, perrors.WithMessage(err, "init pod informer")
-	}
-
-	go c.run()
-
-	return c, nil
-}
-
-// GetInClusterKubernetesClient
-// current pod running in kubernetes-cluster
-func GetInClusterKubernetesClient() (kubernetes.Interface, error) {
-	// read in-cluster config
-	cfg, err := rest.InClusterConfig()
-	if err != nil {
-		return nil, perrors.WithMessage(err, "get in-cluster config")
-	}
-
-	return kubernetes.NewForConfig(cfg)
-}
-
-// initWatchSet
-// 1. get all with dubbo label pods
-// 2. put every element to watcherSet
-// 3. refresh watch book-mark
-func (c *dubboRegistryController) initWatchSet() error {
-	req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
-	if err != nil {
-		return perrors.WithMessage(err, "new requirement")
-	}
-
-	for ns := range c.needWatchedNamespace {
-		pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
-			LabelSelector: req.String(),
-		})
-		if err != nil {
-			return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
-		}
-		for _, p := range pods.Items {
-			// set resource version
-			rv, err := strconv.ParseUint(p.GetResourceVersion(), 10, 0)
-			if err != nil {
-				return perrors.WithMessagef(err, "parse resource version %s", p.GetResourceVersion())
-			}
-			if c.listAndWatchStartResourceVersion < rv {
-				c.listAndWatchStartResourceVersion = rv
-			}
-			c.handleWatchedPodEvent(&p, watch.Added)
-		}
-	}
-	return nil
-}
-
-// read dubbo-registry controller config
-// 1. current pod name
-// 2. current pod working namespace
-func (c *dubboRegistryController) readConfig() error {
-	// read current pod name && namespace
-	c.name = os.Getenv(podNameKey)
-	if len(c.name) == 0 {
-		return perrors.Errorf("read pod name from env %s failed", podNameKey)
-	}
-	namespace, err := ioutil.ReadFile(nameSpaceFilePath)
-	if err == nil && len(namespace) != 0 {
-		c.namespace = string(namespace)
-		return nil
-	}
-	c.namespace = os.Getenv(nameSpaceKey)
-	if len(c.namespace) != 0 {
-		return nil
-	}
-	return perrors.Errorf("get empty namesapce, please check if namespace file at %s exist, or environment %s"+
-		" is set", nameSpaceFilePath, nameSpaceKey)
-
-}
-
-func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error {
-	req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
-	if err != nil {
-		return perrors.WithMessage(err, "new requirement")
-	}
-
-	informersFactory := informers.NewSharedInformerFactoryWithOptions(
-		c.kc,
-		defaultResync,
-		informers.WithNamespace(ns),
-		informers.WithTweakListOptions(func(options *metav1.ListOptions) {
-			options.LabelSelector = req.String()
-			options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
-		}),
-	)
-	podInformer := informersFactory.Core().V1().Pods()
-
-	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
-		AddFunc:    c.addPod,
-		UpdateFunc: c.updatePod,
-		DeleteFunc: c.deletePod,
-	})
-
-	c.namespacedInformerFactory[ns] = informersFactory
-	c.namespacedPodInformers[ns] = podInformer
-
-	return nil
-}
-
-func (c *dubboRegistryController) initPodInformer() error {
-	if c.role == common.PROVIDER {
-		return nil
-	}
-
-	// read need watched namespaces list
-	needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
-	if len(needWatchedNameSpaceList) == 0 {
-		return perrors.New("read value from env by key (DUBBO_NAMESPACE)")
-	}
-	for _, ns := range strings.Split(needWatchedNameSpaceList, ",") {
-		c.needWatchedNamespace[ns] = struct{}{}
-	}
-	// current work namespace should be watched
-	c.needWatchedNamespace[c.namespace] = struct{}{}
-
-	c.queue = workqueue.New()
-
-	// init all watch needed pod-informer
-	for watchedNS := range c.needWatchedNamespace {
-		if err := c.initNamespacedPodInformer(watchedNS); err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-type kubernetesEvent struct {
-	p *v1.Pod
-	t watch.EventType
-}
-
-func (c *dubboRegistryController) addPod(obj interface{}) {
-	p, ok := obj.(*v1.Pod)
-	if !ok {
-		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
-		return
-	}
-	c.queue.Add(&kubernetesEvent{
-		t: watch.Added,
-		p: p,
-	})
-}
-
-func (c *dubboRegistryController) updatePod(oldObj, newObj interface{}) {
-	op, ok := oldObj.(*v1.Pod)
-	if !ok {
-		logger.Warnf("pod-informer got object %T not *v1.Pod", oldObj)
-		return
-	}
-	np, ok := newObj.(*v1.Pod)
-	if !ok {
-		logger.Warnf("pod-informer got object %T not *v1.Pod", newObj)
-		return
-	}
-	if op.GetResourceVersion() == np.GetResourceVersion() {
-		return
-	}
-	c.queue.Add(&kubernetesEvent{
-		p: np,
-		t: watch.Modified,
-	})
-}
-
-func (c *dubboRegistryController) deletePod(obj interface{}) {
-	p, ok := obj.(*v1.Pod)
-	if !ok {
-		logger.Warnf("pod-informer got object %T not *v1.Pod", obj)
-		return
-	}
-	c.queue.Add(&kubernetesEvent{
-		p: p,
-		t: watch.Deleted,
-	})
-}
-
-func (c *dubboRegistryController) startALLInformers() {
-	logger.Debugf("starting namespaced informer-factory")
-	for _, factory := range c.namespacedInformerFactory {
-		go factory.Start(c.ctx.Done())
-	}
-}
-
-// run
-// controller process every event in work-queue
-func (c *dubboRegistryController) run() {
-	if c.role == common.PROVIDER {
-		return
-	}
-
-	defer logger.Warn("dubbo registry controller work stopped")
-	defer c.queue.ShutDown()
-
-	for ns, podInformer := range c.namespacedPodInformers {
-		if !cache.WaitForCacheSync(c.ctx.Done(), podInformer.Informer().HasSynced) {
-			logger.Errorf("wait for cache sync finish @namespace %s fail", ns)
-			return
-		}
-	}
-
-	logger.Infof("kubernetes registry-controller running @Namespace = %q @PodName = %q", c.namespace, c.name)
-
-	// start work
-	go c.work()
-	// block wait context cancel
-	<-c.ctx.Done()
-}
-
-func (c *dubboRegistryController) work() {
-	for c.processNextWorkItem() {
-	}
-}
-
-// processNextWorkItem process work-queue elements
-func (c *dubboRegistryController) processNextWorkItem() bool {
-	item, shutdown := c.queue.Get()
-	if shutdown {
-		return false
-	}
-	defer c.queue.Done(item)
-	o := item.(*kubernetesEvent)
-	c.handleWatchedPodEvent(o.p, o.t)
-	return true
-}
-
-// handleWatchedPodEvent handles watched pod event
-func (c *dubboRegistryController) handleWatchedPodEvent(p *v1.Pod, eventType watch.EventType) {
-	logger.Debugf("get @type = %s event from @pod = %s", eventType, p.GetName())
-
-	for ak, av := range p.GetAnnotations() {
-		// not dubbo interest annotation
-		if ak != DubboIOAnnotationKey {
-			continue
-		}
-		ol, err := c.unmarshalRecord(av)
-		if err != nil {
-			logger.Errorf("there a pod with dubbo annotation, but unmarshal dubbo value %v", err)
-			return
-		}
-		for _, o := range ol {
-			switch eventType {
-			case watch.Added:
-				// if pod is added, the record always be create
-				o.EventType = Create
-			case watch.Modified:
-				o.EventType = Update
-			case watch.Deleted:
-				o.EventType = Delete
-			default:
-				logger.Errorf("no valid kubernetes event-type (%s) ", eventType)
-				return
-			}
-
-			logger.Debugf("putting @key=%s @value=%s to watcherSet", o.Key, o.Value)
-			if err := c.watcherSet.Put(o); err != nil {
-				logger.Errorf("put (%#v) to cache watcherSet: %v ", o, err)
-				return
-			}
-		}
-	}
-}
-
-// unmarshalRecord unmarshals the kubernetes dubbo annotation value
-func (c *dubboRegistryController) unmarshalRecord(record string) ([]*WatcherEvent, error) {
-	if len(record) == 0 {
-		// []*WatcherEvent is nil.
-		return nil, nil
-	}
-
-	rawMsg, err := base64.URLEncoding.DecodeString(record)
-	if err != nil {
-		return nil, perrors.WithMessagef(err, "decode record (%s)", record)
-	}
-
-	var out []*WatcherEvent
-	if err := json.Unmarshal(rawMsg, &out); err != nil {
-		return nil, perrors.WithMessage(err, "decode json")
-	}
-	return out, nil
-}
-
-// initCurrentPod
-// 1. get current pod
-// 2. give the dubbo-label for this pod
-func (c *dubboRegistryController) initCurrentPod() error {
-	currentPod, err := c.readCurrentPod()
-	if err != nil {
-		return perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
-	}
-
-	oldPod, newPod, err := c.assembleDUBBOLabel(currentPod)
-	if err != nil {
-		if err == ErrDubboLabelAlreadyExist {
-			return nil
-		}
-		return perrors.WithMessage(err, "assemble dubbo label")
-	}
-	// current pod don't have label
-	p, err := c.getPatch(oldPod, newPod)
-	if err != nil {
-		return perrors.WithMessage(err, "get patch")
-	}
-
-	_, err = c.patchCurrentPod(p)
-	if err != nil {
-		return perrors.WithMessage(err, "patch to current pod")
-	}
-
-	return nil
-}
-
-// patchCurrentPod writes new meta for current pod
-func (c *dubboRegistryController) patchCurrentPod(patch []byte) (*v1.Pod, error) {
-	updatedPod, err := c.kc.CoreV1().Pods(c.namespace).Patch(c.name, types.StrategicMergePatchType, patch)
-	if err != nil {
-		return nil, perrors.WithMessage(err, "patch in kubernetes pod ")
-	}
-	return updatedPod, nil
-}
-
-func (c *dubboRegistryController) assembleLabel(k, v string) error {
-	var (
-		oldPod = &v1.Pod{}
-		newPod = &v1.Pod{}
-	)
-	oldPod.Labels = make(map[string]string, 8)
-	newPod.Labels = make(map[string]string, 8)
-	currentPod, err := c.readCurrentPod()
-	if err != nil {
-		return err
-	}
-	// copy current pod labels to oldPod && newPod
-	for k, v := range currentPod.GetLabels() {
-		oldPod.Labels[k] = v
-		newPod.Labels[k] = v
-	}
-	newPod.Labels[k] = v
-
-	p, err := c.getPatch(oldPod, newPod)
-	if err != nil {
-		return perrors.WithMessage(err, "get patch")
-	}
-
-	_, err = c.patchCurrentPod(p)
-	if err != nil {
-		return perrors.WithMessage(err, "patch to current pod")
-	}
-	return nil
-}
-
-// assembleDUBBOLabel assembles the dubbo kubernetes label
-// every dubbo instance should be labeled spec {"dubbo.io/label":"dubbo.io/label-value"} label
-func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Pod, error) {
-	var (
-		oldPod = &v1.Pod{}
-		newPod = &v1.Pod{}
-	)
-	oldPod.Labels = make(map[string]string, 8)
-	newPod.Labels = make(map[string]string, 8)
-
-	if p.GetLabels() != nil {
-		if _, ok := p.GetLabels()[DubboIOLabelKey]; ok {
-			// already have label
-			return nil, nil, ErrDubboLabelAlreadyExist
-		}
-	}
-
-	// copy current pod labels to oldPod && newPod
-	for k, v := range p.GetLabels() {
-		oldPod.Labels[k] = v
-		newPod.Labels[k] = v
-	}
-
-	// assign new label for current pod
-	switch c.role {
-	case common.CONSUMER:
-		newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue
-	case common.PROVIDER:
-		newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue
-	default:
-		return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role))
-	}
-	return oldPod, newPod, nil
-}
-
-// assembleDUBBOAnnotations assembles the dubbo kubernetes annotations
-// accord the current pod && (k,v) assemble the old-pod, new-pod
-func (c *dubboRegistryController) assembleDUBBOAnnotations(k, v string, currentPod *v1.Pod) (oldPod *v1.Pod, newPod *v1.Pod, err error) {
-	oldPod = &v1.Pod{}
-	newPod = &v1.Pod{}
-	oldPod.Annotations = make(map[string]string, 8)
-	newPod.Annotations = make(map[string]string, 8)
-
-	for k, v := range currentPod.GetAnnotations() {
-		oldPod.Annotations[k] = v
-		newPod.Annotations[k] = v
-	}
-
-	al, err := c.unmarshalRecord(oldPod.GetAnnotations()[DubboIOAnnotationKey])
-	if err != nil {
-		err = perrors.WithMessage(err, "unmarshal record")
-		return
-	}
-
-	newAnnotations, err := c.marshalRecord(append(al, &WatcherEvent{Key: k, Value: v}))
-	if err != nil {
-		err = perrors.WithMessage(err, "marshal record")
-		return
-	}
-
-	newPod.Annotations[DubboIOAnnotationKey] = newAnnotations
-	return
-}
-
-// getPatch gets the kubernetes pod patch bytes
-func (c *dubboRegistryController) getPatch(oldPod, newPod *v1.Pod) ([]byte, error) {
-	oldData, err := json.Marshal(oldPod)
-	if err != nil {
-		return nil, perrors.WithMessage(err, "marshal old pod")
-	}
-
-	newData, err := json.Marshal(newPod)
-	if err != nil {
-		return nil, perrors.WithMessage(err, "marshal newPod pod")
-	}
-
-	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Pod{})
-	if err != nil {
-		return nil, perrors.WithMessage(err, "create two-way-merge-patch")
-	}
-	return patchBytes, nil
-}
-
-// marshalRecord marshals the kubernetes dubbo annotation value
-func (c *dubboRegistryController) marshalRecord(ol []*WatcherEvent) (string, error) {
-	msg, err := json.Marshal(ol)
-	if err != nil {
-		return "", perrors.WithMessage(err, "json encode object list")
-	}
-	return base64.URLEncoding.EncodeToString(msg), nil
-}
-
-// readCurrentPod reads from kubernetes-env current pod status
-func (c *dubboRegistryController) readCurrentPod() (*v1.Pod, error) {
-	currentPod, err := c.kc.CoreV1().Pods(c.namespace).Get(c.name, metav1.GetOptions{})
-	if err != nil {
-		return nil, perrors.WithMessagef(err, "get current (%s) pod in namespace (%s)", c.name, c.namespace)
-	}
-	return currentPod, nil
-}
-
-// addAnnotationForCurrentPod adds annotation for current pod
-func (c *dubboRegistryController) addAnnotationForCurrentPod(k string, v string) error {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
-	// 1. accord old pod && (k, v) assemble new pod dubbo annotation v
-	// 2. get patch data
-	// 3. PATCH the pod
-	currentPod, err := c.readCurrentPod()
-	if err != nil {
-		return perrors.WithMessage(err, "read current pod")
-	}
-
-	oldPod, newPod, err := c.assembleDUBBOAnnotations(k, v, currentPod)
-	if err != nil {
-		return perrors.WithMessage(err, "assemble")
-	}
-
-	patchBytes, err := c.getPatch(oldPod, newPod)
-	if err != nil {
-		return perrors.WithMessage(err, "get patch")
-	}
-
-	_, err = c.patchCurrentPod(patchBytes)
-	if err != nil {
-		return perrors.WithMessage(err, "patch current pod")
-	}
-
-	return c.watcherSet.Put(&WatcherEvent{
-		Key:       k,
-		Value:     v,
-		EventType: Create,
-	})
-}
diff --git a/remoting/kubernetes/watch.go b/remoting/kubernetes/watch.go
deleted file mode 100644
index 34f5a34..0000000
--- a/remoting/kubernetes/watch.go
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"context"
-	"strconv"
-	"strings"
-	"sync"
-)
-
-import (
-	perrors "github.com/pkg/errors"
-)
-
-var (
-	ErrWatcherSetAlreadyStopped = perrors.New("the watcher-set already be stopped")
-	ErrKVPairNotFound           = perrors.New("k/v pair not found")
-)
-
-const (
-	defaultWatcherChanSize = 100
-)
-
-type eventType int
-
-const (
-	Create eventType = iota
-	Update
-	Delete
-)
-
-func (e eventType) String() string {
-	switch e {
-	case Create:
-		return "CREATE"
-	case Update:
-		return "UPDATE"
-	case Delete:
-		return "DELETE"
-	default:
-		return "UNKNOWN"
-	}
-}
-
-// WatcherEvent
-// watch event is element in watcherSet
-type WatcherEvent struct {
-	// event-type
-	EventType eventType `json:"-"`
-	// the dubbo-go should consume the key
-	Key string `json:"k"`
-	// the dubbo-go should consume the value
-	Value string `json:"v"`
-}
-
-// Watchable WatcherSet
-// thread-safe
-type WatcherSet interface {
-
-	// put the watch event to the watch set
-	Put(object *WatcherEvent) error
-	// if prefix is false,
-	// the len([]*WatcherEvent) == 1
-	Get(key string, prefix bool) ([]*WatcherEvent, error)
-	// watch the spec key or key prefix
-	Watch(key string, prefix bool) (Watcher, error)
-	// check the watcher set status
-	Done() <-chan struct{}
-}
-
-// Watcher
-type Watcher interface {
-	// the watcher's id
-	ID() string
-	// result stream
-	ResultChan() <-chan *WatcherEvent
-	// Stop the watcher
-	stop()
-	// check the watcher status
-	done() <-chan struct{}
-}
-
-// the watch set implement
-type watcherSetImpl struct {
-
-	// Client's ctx, client die, the watch set will die too
-	ctx context.Context
-
-	// protect watcher-set and watchers
-	lock sync.RWMutex
-
-	// the key is dubbo-go interest meta
-	cache map[string]*WatcherEvent
-
-	currentWatcherId uint64
-	watchers         map[uint64]*watcher
-}
-
-// closeWatchers
-// when the watcher-set was closed
-func (s *watcherSetImpl) closeWatchers() {
-	<-s.ctx.Done()
-	// parent ctx be canceled, close the watch-set's watchers
-	s.lock.Lock()
-	watchers := s.watchers
-	s.lock.Unlock()
-
-	for _, w := range watchers {
-		// stop data stream
-		// close(w.ch)
-		// stop watcher
-		w.stop()
-	}
-}
-
-// Watch
-// watch on spec key, with or without prefix
-func (s *watcherSetImpl) Watch(key string, prefix bool) (Watcher, error) {
-	return s.addWatcher(key, prefix)
-}
-
-// Done gets the watcher-set status
-func (s *watcherSetImpl) Done() <-chan struct{} {
-	return s.ctx.Done()
-}
-
-// Put puts the watch event to watcher-set
-func (s *watcherSetImpl) Put(watcherEvent *WatcherEvent) error {
-	blockSendMsg := func(object *WatcherEvent, w *watcher) {
-		select {
-		case <-w.done():
-			// the watcher already stop
-		case w.ch <- object:
-			// block send the msg
-		}
-	}
-
-	s.lock.Lock()
-	defer s.lock.Unlock()
-
-	if err := s.valid(); err != nil {
-		return err
-	}
-
-	// put to watcher-set
-	switch watcherEvent.EventType {
-	case Delete:
-		// delete from store
-		delete(s.cache, watcherEvent.Key)
-	case Update, Create:
-		o, ok := s.cache[watcherEvent.Key]
-		if !ok {
-			// pod update, but create new k/v pair
-			watcherEvent.EventType = Create
-			s.cache[watcherEvent.Key] = watcherEvent
-			break
-		}
-		// k/v pair already latest
-		if o.Value == watcherEvent.Value {
-			return nil
-		}
-		// update to latest status
-		s.cache[watcherEvent.Key] = watcherEvent
-	}
-
-	// notify watcher
-	for _, w := range s.watchers {
-		if !strings.Contains(watcherEvent.Key, w.interested.key) {
-			//  this watcher no interest in this element
-			continue
-		}
-		if !w.interested.prefix {
-			if watcherEvent.Key == w.interested.key {
-				blockSendMsg(watcherEvent, w)
-			}
-			// not interest
-			continue
-		}
-		blockSendMsg(watcherEvent, w)
-	}
-	return nil
-}
-
-// valid
-func (s *watcherSetImpl) valid() error {
-	select {
-	case <-s.ctx.Done():
-		return ErrWatcherSetAlreadyStopped
-	default:
-		return nil
-	}
-}
-
-// addWatcher
-func (s *watcherSetImpl) addWatcher(key string, prefix bool) (Watcher, error) {
-	if err := s.valid(); err != nil {
-		return nil, err
-	}
-
-	s.lock.Lock()
-	defer s.lock.Unlock()
-
-	// increase the watcher-id
-	s.currentWatcherId++
-
-	w := &watcher{
-		id:         s.currentWatcherId,
-		watcherSet: s,
-		interested: struct {
-			key    string
-			prefix bool
-		}{key: key, prefix: prefix},
-		ch:   make(chan *WatcherEvent, defaultWatcherChanSize),
-		exit: make(chan struct{}),
-	}
-	s.watchers[s.currentWatcherId] = w
-	return w, nil
-}
-
-// Get gets elements from watcher-set
-func (s *watcherSetImpl) Get(key string, prefix bool) ([]*WatcherEvent, error) {
-	s.lock.RLock()
-	defer s.lock.RUnlock()
-
-	if err := s.valid(); err != nil {
-		return nil, err
-	}
-
-	if !prefix {
-		for k, v := range s.cache {
-			if k == key {
-				return []*WatcherEvent{v}, nil
-			}
-		}
-		// object
-		return nil, ErrKVPairNotFound
-	}
-
-	var out []*WatcherEvent
-
-	for k, v := range s.cache {
-		if strings.Contains(k, key) {
-			out = append(out, v)
-		}
-	}
-
-	if len(out) == 0 {
-		return nil, ErrKVPairNotFound
-	}
-
-	return out, nil
-}
-
-// the watcher-set watcher
-type watcher struct {
-	id uint64
-
-	// the underlay watcherSet
-	watcherSet *watcherSetImpl
-
-	// the interest topic
-	interested struct {
-		key    string
-		prefix bool
-	}
-	ch chan *WatcherEvent
-
-	closeOnce sync.Once
-	exit      chan struct{}
-}
-
-// nolint
-func (w *watcher) ResultChan() <-chan *WatcherEvent {
-	return w.ch
-}
-
-// nolint
-func (w *watcher) ID() string {
-	return strconv.FormatUint(w.id, 10)
-}
-
-// nolint
-func (w *watcher) stop() {
-	// double close will panic
-	w.closeOnce.Do(func() {
-		close(w.exit)
-	})
-}
-
-// done checks watcher status
-func (w *watcher) done() <-chan struct{} {
-	return w.exit
-}
-
-// newWatcherSet returns new watcher set from parent context
-func newWatcherSet(ctx context.Context) WatcherSet {
-	s := &watcherSetImpl{
-		ctx:      ctx,
-		cache:    map[string]*WatcherEvent{},
-		watchers: map[uint64]*watcher{},
-	}
-	go s.closeWatchers()
-	return s
-}
diff --git a/remoting/kubernetes/watch_test.go b/remoting/kubernetes/watch_test.go
deleted file mode 100644
index 9a0139d..0000000
--- a/remoting/kubernetes/watch_test.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kubernetes
-
-import (
-	"context"
-	"strconv"
-	"sync"
-	"testing"
-	"time"
-)
-
-func TestWatchSet(t *testing.T) {
-	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
-	defer cancel()
-
-	s := newWatcherSet(ctx)
-
-	wg := sync.WaitGroup{}
-
-	for i := 0; i < 2; i++ {
-
-		wg.Add(1)
-
-		go func() {
-			defer wg.Done()
-			w, err := s.Watch("key-1", false)
-			if err != nil {
-				t.Error(err)
-				return
-			}
-			for {
-				select {
-				case e := <-w.ResultChan():
-					t.Logf("consumer %s got %s\n", w.ID(), e.Key)
-
-				case <-w.done():
-					t.Logf("consumer %s stopped", w.ID())
-					return
-				}
-			}
-		}()
-	}
-	for i := 2; i < 3; i++ {
-
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			w, err := s.Watch("key", true)
-			if err != nil {
-				t.Error(err)
-				return
-			}
-
-			for {
-				select {
-				case e := <-w.ResultChan():
-					t.Logf("prefix consumer %s got %s\n", w.ID(), e.Key)
-
-				case <-w.done():
-					t.Logf("prefix consumer %s stopped", w.ID())
-					return
-				}
-			}
-		}()
-	}
-
-	for i := 0; i < 5; i++ {
-		go func(i int) {
-			if err := s.Put(&WatcherEvent{
-				Key:   "key-" + strconv.Itoa(i),
-				Value: strconv.Itoa(i),
-			}); err != nil {
-				t.Error(err)
-				return
-			}
-		}(i)
-	}
-
-	wg.Wait()
-}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index b4a2503..fbe4749 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -175,7 +175,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 	for _, n := range newChildren {
 
 		newNode = path.Join(zkPath, n)
-		logger.Infof("[Zookeeper Listener] add zkNode{%s}", newNode)
+		logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
 		content, _, connErr := l.client.Conn.Get(newNode)
 		if connErr != nil {
 			logger.Errorf("Get new node path {%v} 's content error,message is  {%v}",