You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/10/25 07:23:39 UTC
[dubbo-go] branch 3.0 updated: add dynamic route config (#1519)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e551945 add dynamic route config (#1519)
e551945 is described below
commit e5519450e07fd3331b7661d064ae3536e36a42b0
Author: Mulavar <97...@qq.com>
AuthorDate: Mon Oct 25 15:23:35 2021 +0800
add dynamic route config (#1519)
* remove struct DubboRouterRule which is useless
* remove some error which is useless
* rename DubboServiceRouterItem to DubboRouteDetail
* 1. add zk mesh route dynamic configuration listener
2. add base64Enable flag to zookeeperDynamicConfiguration
* rename Router to Routers
* Revert "rename Router to Routers"
This reverts commit dd0867d6
Co-authored-by: dongjianhui03 <do...@meituan.com>
---
cluster/directory/base/directory_test.go | 2 +-
cluster/directory/static/directory.go | 3 +-
cluster/router/chain/chain.go | 21 +-
cluster/router/router.go | 13 +-
cluster/router/v3router/dubbo_rule.go | 61 ------
cluster/router/v3router/factory.go | 9 +-
cluster/router/v3router/factory_test.go | 35 ----
.../v3router/k8s_api/listener_handler_impl.go | 2 +-
cluster/router/v3router/k8s_crd/client.go | 2 +-
cluster/router/v3router/router_chain.go | 214 ++++++++++-----------
cluster/router/v3router/router_chain_test.go | 65 +++----
cluster/router/v3router/uniform_route.go | 26 ++-
cluster/router/v3router/uniform_rule.go | 27 +--
common/constant/key.go | 4 +-
common/extension/router_factory.go | 7 +-
config/config_center_config.go | 31 +--
config/uniform_router_config.go | 21 +-
config_center/mock_dynamic_config.go | 4 +-
config_center/zookeeper/impl.go | 28 ++-
config_center/zookeeper/listener.go | 13 +-
imports/imports.go | 1 +
registry/directory/directory.go | 6 +-
registry/protocol/protocol.go | 4 +-
remoting/zookeeper/listener.go | 9 +-
24 files changed, 266 insertions(+), 342 deletions(-)
diff --git a/cluster/directory/base/directory_test.go b/cluster/directory/base/directory_test.go
index 79cca2c..3e8c5c9 100644
--- a/cluster/directory/base/directory_test.go
+++ b/cluster/directory/base/directory_test.go
@@ -50,7 +50,7 @@ func TestBuildRouterChain(t *testing.T) {
regURL.AddParam(constant.INTERFACE_KEY, "mock-app")
directory := NewDirectory(regURL)
var err error
- directory.routerChain, err = chain.NewRouterChain(regURL)
+ directory.routerChain, err = chain.NewRouterChain()
assert.Error(t, err)
}
diff --git a/cluster/directory/static/directory.go b/cluster/directory/static/directory.go
index 7e877e4..0e9ecc3 100644
--- a/cluster/directory/static/directory.go
+++ b/cluster/directory/static/directory.go
@@ -91,8 +91,7 @@ func (dir *directory) BuildRouterChain(invokers []protocol.Invoker) error {
if len(invokers) == 0 {
return perrors.Errorf("invokers == null")
}
- url := invokers[0].GetURL()
- routerChain, e := chain.NewRouterChain(url)
+ routerChain, e := chain.NewRouterChain()
if e != nil {
return e
}
diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go
index 250e183..e366a2a 100644
--- a/cluster/router/chain/chain.go
+++ b/cluster/router/chain/chain.go
@@ -36,11 +36,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-var (
- virtualServiceConfigByte []byte
- destinationRuleConfigByte []byte
-)
-
// RouterChain Router chain
type RouterChain struct {
// Full list of addresses from registry, classified by method name.
@@ -106,14 +101,9 @@ func (c *RouterChain) copyInvokers() []protocol.Invoker {
return ret
}
-func SetVSAndDRConfigByte(vs, dr []byte) {
- virtualServiceConfigByte = vs
- destinationRuleConfigByte = dr
-}
-
-// NewRouterChain Use url to init router chain
+// NewRouterChain init router chain
// Loop routerFactories and call NewRouter method
-func NewRouterChain(url *common.URL) (*RouterChain, error) {
+func NewRouterChain() (*RouterChain, error) {
routerFactories := extension.GetRouterFactories()
if len(routerFactories) == 0 {
return nil, perrors.Errorf("No routerFactory exits , create one please")
@@ -122,12 +112,9 @@ func NewRouterChain(url *common.URL) (*RouterChain, error) {
routers := make([]router.PriorityRouter, 0, len(routerFactories))
for key, routerFactory := range routerFactories {
- if virtualServiceConfigByte == nil || destinationRuleConfigByte == nil {
- logger.Warnf("virtual Service ProtocolConfig or destinationRule Confi Byte may be empty, pls check your CONF_VIRTUAL_SERVICE_FILE_PATH and CONF_DEST_RULE_FILE_PATH env is correctly point to your yaml file\n")
- }
- r, err := routerFactory().NewPriorityRouter(virtualServiceConfigByte, destinationRuleConfigByte)
+ r, err := routerFactory().NewPriorityRouter()
if r == nil || err != nil {
- logger.Errorf("router chain build router fail! routerFactories key:%s error:%vv", key, err)
+ logger.Errorf("router chain build router fail! routerFactories key:%s error:%v", key, err)
continue
}
routers = append(routers, r)
diff --git a/cluster/router/router.go b/cluster/router/router.go
index d3d2fe5..301510b 100644
--- a/cluster/router/router.go
+++ b/cluster/router/router.go
@@ -26,25 +26,20 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-// Extension - Router
// PriorityRouterFactory creates creates priority router with url
type PriorityRouterFactory interface {
// NewPriorityRouter creates router instance with URL
- NewPriorityRouter([]byte, []byte) (PriorityRouter, error)
+ NewPriorityRouter() (PriorityRouter, error)
}
-// Router
-type router interface {
+// PriorityRouter routes with priority
+type PriorityRouter interface {
// Route Determine the target invokers list.
Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker
// URL Return URL in router
URL() *common.URL
-}
-// Router
-type PriorityRouter interface {
- router
// Priority Return Priority in router
// 0 to ^int(0) is better
Priority() int64
@@ -66,7 +61,7 @@ type Poolable interface {
// AddrPool is an address pool, backed by a snapshot of address list, divided into categories.
type AddrPool map[string]*roaring.Bitmap
-// AddrMetadta is address metadata, collected from a snapshot of address list by a router, if it implements Poolable.
+// AddrMetadata is address metadata, collected from a snapshot of address list by a router, if it implements Poolable.
type AddrMetadata interface {
// Source indicates where the metadata comes from.
Source() string
diff --git a/cluster/router/v3router/dubbo_rule.go b/cluster/router/v3router/dubbo_rule.go
deleted file mode 100644
index c809354..0000000
--- a/cluster/router/v3router/dubbo_rule.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package v3router
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/config"
- "dubbo.apache.org/dubbo-go/v3/protocol"
-)
-
-// nolint
-type DubboRouterRule struct {
- uniformRules []*UniformRule
-}
-
-func newDubboRouterRule(dubboRoutes []*config.DubboRoute,
- destinationMap map[string]map[string]string) (*DubboRouterRule, error) {
-
- uniformRules := make([]*UniformRule, 0)
- for _, v := range dubboRoutes {
- uniformRule, err := newUniformRule(v, destinationMap)
- if err != nil {
- return nil, err
- }
- uniformRules = append(uniformRules, uniformRule)
- }
-
- return &DubboRouterRule{
- uniformRules: uniformRules,
- }, nil
-}
-
-func (drr *DubboRouterRule) route(invokers []protocol.Invoker, url *common.URL,
- invocation protocol.Invocation) []protocol.Invoker {
-
- resultInvokers := make([]protocol.Invoker, 0)
- for _, v := range drr.uniformRules {
- if resultInvokers = v.route(invokers, url, invocation); len(resultInvokers) == 0 {
- continue
- }
- // once there is a uniformRule successfully get target invoker lists, return it
- return resultInvokers
- }
- // return s empty invoker list
- return resultInvokers
-}
diff --git a/cluster/router/v3router/factory.go b/cluster/router/v3router/factory.go
index afd14af..e751247 100644
--- a/cluster/router/v3router/factory.go
+++ b/cluster/router/v3router/factory.go
@@ -19,8 +19,13 @@ package v3router
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
)
+func init() {
+ extension.SetRouterFactory("mesh", NewUniformRouterFactory)
+}
+
// UniformRouteFactory is uniform router's factory
type UniformRouteFactory struct{}
@@ -30,6 +35,6 @@ func NewUniformRouterFactory() router.PriorityRouterFactory {
}
// NewPriorityRouter construct a new UniformRouteFactory as PriorityRouter
-func (f *UniformRouteFactory) NewPriorityRouter(vsConfigBytes, distConfigBytes []byte) (router.PriorityRouter, error) {
- return NewUniformRouterChain(vsConfigBytes, distConfigBytes)
+func (f *UniformRouteFactory) NewPriorityRouter() (router.PriorityRouter, error) {
+ return NewUniformRouterChain()
}
diff --git a/cluster/router/v3router/factory_test.go b/cluster/router/v3router/factory_test.go
deleted file mode 100644
index a5ea415..0000000
--- a/cluster/router/v3router/factory_test.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package v3router
-
-import (
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-// TestUniformRouterFacotry created a new factory that can new uniform router
-func TestUniformRouterFacotry(t *testing.T) {
- factory := NewUniformRouterFactory()
- assert.NotNil(t, factory)
- router, err := factory.NewPriorityRouter([]byte{}, []byte{})
- assert.Nil(t, err)
- assert.NotNil(t, router)
-}
diff --git a/cluster/router/v3router/k8s_api/listener_handler_impl.go b/cluster/router/v3router/k8s_api/listener_handler_impl.go
index 38f5748..70dc8be 100644
--- a/cluster/router/v3router/k8s_api/listener_handler_impl.go
+++ b/cluster/router/v3router/k8s_api/listener_handler_impl.go
@@ -36,7 +36,7 @@ import (
const (
VirtualServiceEventKey = "virtualServiceEventKey"
- DestinationRuleEventKey = "destinationRuleEventKe3y"
+ DestinationRuleEventKey = "destinationRuleEventKey"
VirtualServiceResource = "virtualservices"
DestRuleResource = "destinationrules"
diff --git a/cluster/router/v3router/k8s_crd/client.go b/cluster/router/v3router/k8s_crd/client.go
index 2dfe201..0da410c 100644
--- a/cluster/router/v3router/k8s_crd/client.go
+++ b/cluster/router/v3router/k8s_crd/client.go
@@ -105,7 +105,7 @@ func NewK8sCRDClient(groupName, groupVersion, namespace string, handlers ...List
return newClient, nil
}
-// func (c *Client) WatchResources() []cache.Store { can only be called once
+// WatchResources can only be called once
func (c *Client) WatchResources() []cache.Store {
stores := make([]cache.Store, 0)
c.once.Do(
diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go
index ce82694..973f98e 100644
--- a/cluster/router/v3router/router_chain.go
+++ b/cluster/router/v3router/router_chain.go
@@ -18,7 +18,6 @@
package v3router
import (
- "encoding/json"
"io"
"strings"
)
@@ -29,43 +28,47 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
- "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router/k8s_api"
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
- "dubbo.apache.org/dubbo-go/v3/remoting"
)
// RouterChain contains all uniform router logic
// it has UniformRouter list,
type RouterChain struct {
- routers []*UniformRouter
- virtualServiceConfigBytes []byte
- destinationRuleConfigBytes []byte
- notify chan struct{}
+ routers []*UniformRouter
+ notify chan struct{}
}
-// NewUniformRouterChain return
-func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte) (router.PriorityRouter, error) {
- fromFileConfig := true
- uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig)
+// nolint
+func NewUniformRouterChain() (router.PriorityRouter, error) {
+ // 1. add mesh route listener
+ r := &RouterChain{}
+ rootConfig := config.GetRootConfig()
+ dynamicConfiguration, err := rootConfig.ConfigCenter.GetDynamicConfiguration()
if err != nil {
- fromFileConfig = false
- logger.Warnf("parse router config form local file failed, error = %+v", err)
+ return nil, err
}
- r := &RouterChain{
- virtualServiceConfigBytes: virtualServiceConfig,
- destinationRuleConfigBytes: destinationRuleConfig,
- routers: uniformRouters,
+ dynamicConfiguration.AddListener(rootConfig.Application.Name, r)
+
+ // 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
+ key := rootConfig.Application.Name + constant.MeshRouteSuffix
+ meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
+ if err != nil {
+ // the mesh route may not be initialized now
+ logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
+ return r, nil
}
- if err := k8s_api.SetK8sEventListener(r); err != nil {
- logger.Warnf("try listen K8s router config failed, error = %+v", err)
- if !fromFileConfig {
- panic("No config file from both local file and k8s")
- }
+ logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
+ routes, err := parseRoute(meshRouteValue)
+ if err != nil {
+ logger.Warnf("Parse mesh route failed, error=%v", err)
+ return nil, err
}
+ r.routers = routes
return r, nil
}
@@ -77,84 +80,15 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca
return invokers
}
+// Process process route config change event
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
- logger.Debugf("on processed event = %+v\n", *event)
- if event.ConfigType == remoting.EventTypeAdd || event.ConfigType == remoting.EventTypeUpdate {
- switch event.Key {
- case k8s_api.VirtualServiceEventKey:
- logger.Debug("virtul service event")
- newVSValue, ok := event.Value.(*config.VirtualServiceConfig)
- if !ok {
- logger.Error("event.Value assertion error")
- return
- }
-
- newVSJsonValue, ok := newVSValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"]
- if !ok {
- logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration")
- return
- }
- logger.Debugf("new virtual service json value = \n%v\n", newVSJsonValue)
- newVirtualServiceConfig := &config.VirtualServiceConfig{}
- if err := json.Unmarshal([]byte(newVSJsonValue), newVirtualServiceConfig); err != nil {
- logger.Error("on process json data unmarshal error = ", err)
- return
- }
- newVirtualServiceConfig.YamlAPIVersion = newVirtualServiceConfig.APIVersion
- newVirtualServiceConfig.YamlKind = newVirtualServiceConfig.Kind
- newVirtualServiceConfig.MetaData.Name = newVirtualServiceConfig.ObjectMeta.Name
- logger.Debugf("get event after asseration = %+v\n", newVirtualServiceConfig)
- data, err := yaml.Marshal(newVirtualServiceConfig)
- if err != nil {
- logger.Error("Process change of virtual service: event.Value marshal error:", err)
- return
- }
- r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes)
- if err != nil {
- logger.Error("Process change of virtual service: parseFromConfigToRouters:", err)
- return
- }
- case k8s_api.DestinationRuleEventKey:
- logger.Debug("handling dest rule event")
- newDRValue, ok := event.Value.(*config.DestinationRuleConfig)
- if !ok {
- logger.Error("event.Value assertion error")
- return
- }
-
- newDRJsonValue, ok := newDRValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"]
- if !ok {
- logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration")
- return
- }
- newDestRuleConfig := &config.DestinationRuleConfig{}
- if err := json.Unmarshal([]byte(newDRJsonValue), newDestRuleConfig); err != nil {
- logger.Error("on process json data unmarshal error = ", err)
- return
- }
- newDestRuleConfig.YamlAPIVersion = newDestRuleConfig.APIVersion
- newDestRuleConfig.YamlKind = newDestRuleConfig.Kind
- newDestRuleConfig.MetaData.Name = newDestRuleConfig.ObjectMeta.Name
- logger.Debugf("get event after asseration = %+v\n", newDestRuleConfig)
- data, err := yaml.Marshal(newDestRuleConfig)
- if err != nil {
- logger.Error("Process change of dest rule: event.Value marshal error:", err)
- return
- }
- r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data)
- if err != nil {
- logger.Error("Process change of dest rule: parseFromConfigToRouters:", err)
- return
- }
- default:
- logger.Error("unknown unsupported event key:", event.Key)
- }
+ logger.Debugf("RouteChain process event:\n%+v", event)
+ routers, err := parseRoute(event.Value.(string))
+ if err != nil {
+ return
}
-
+ r.routers = routers
// todo delete router
- //if event.ConfigType == remoting.EventTypeDel {
- //
- //}
}
// Name get name of ConnCheckerRouter
@@ -179,7 +113,7 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte
vsDecoder := yaml.NewDecoder(strings.NewReader(string(virtualServiceConfig)))
drDecoder := yaml.NewDecoder(strings.NewReader(string(destinationRuleConfig)))
- // parse virtual service
+ // 1. parse virtual service config
for {
virtualServiceCfg := &config.VirtualServiceConfig{}
@@ -195,7 +129,7 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte
virtualServiceConfigList = append(virtualServiceConfigList, virtualServiceCfg)
}
- // parse destination rule
+ // 2. parse destination rule config
for {
destRuleCfg := &config.DestinationRuleConfig{}
err := drDecoder.Decode(destRuleCfg)
@@ -219,33 +153,89 @@ func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte
routers := make([]*UniformRouter, 0)
+ // 3. construct virtual service host to destination mapping
for _, v := range virtualServiceConfigList {
- tempSerivceNeedsDescMap := make(map[string]map[string]string)
+ tempServiceNeedsDescMap := make(map[string]map[string]string)
for _, host := range v.Spec.Hosts {
+ // name -> labels
targetDestMap := destRuleConfigsMap[host]
- // copy to new Map
- mapCombine(tempSerivceNeedsDescMap, targetDestMap)
+ // copy to new Map, FIXME name collision
+ mapCopy(tempServiceNeedsDescMap, targetDestMap)
}
- // change single config to one rule
- newRule, err := newDubboRouterRule(v.Spec.Dubbo, tempSerivceNeedsDescMap)
- if err != nil {
- logger.Error("Parse config to uniform rule err = ", err)
+ // transform single config to one rule
+ routers = append(routers, NewUniformRouter(v.Spec.Dubbo, tempServiceNeedsDescMap))
+ }
+ logger.Debug("parsed successfully with router size = ", len(routers))
+ return routers, nil
+}
+
+func parseRoute(routeContent string) ([]*UniformRouter, error) {
+ var virtualServiceConfigList []*config.VirtualServiceConfig
+ destRuleConfigsMap := make(map[string]map[string]map[string]string)
+
+ meshRouteDecoder := yaml.NewDecoder(strings.NewReader(routeContent))
+ for {
+ meshRouteMetadata := &config.MeshRouteMetadata{}
+ err := meshRouteDecoder.Decode(meshRouteMetadata)
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ logger.Error("parseRoute route metadata err = ", err)
return nil, err
}
- rtr, err := NewUniformRouter(newRule)
+
+ bytes, err := yaml.Marshal(meshRouteMetadata.Spec)
if err != nil {
- logger.Error("new uniform router err = ", err)
return nil, err
}
- routers = append(routers, rtr)
+ specDecoder := yaml.NewDecoder(strings.NewReader(string(bytes)))
+ switch meshRouteMetadata.YamlKind {
+ case "VirtualService":
+ meshRouteConfigSpec := &config.UniformRouterConfigSpec{}
+ err := specDecoder.Decode(meshRouteConfigSpec)
+ if err != nil {
+ return nil, err
+ }
+ virtualServiceConfigList = append(virtualServiceConfigList, &config.VirtualServiceConfig{
+ YamlAPIVersion: meshRouteMetadata.YamlAPIVersion,
+ YamlKind: meshRouteMetadata.YamlKind,
+ TypeMeta: meshRouteMetadata.TypeMeta,
+ ObjectMeta: meshRouteMetadata.ObjectMeta,
+ MetaData: meshRouteMetadata.MetaData,
+ Spec: *meshRouteConfigSpec,
+ })
+ case "DestinationRule":
+ meshRouteDestinationRuleSpec := &config.DestinationRuleSpec{}
+ err := specDecoder.Decode(meshRouteDestinationRuleSpec)
+ if err != nil {
+ return nil, err
+ }
+ destRuleCfgMap := make(map[string]map[string]string)
+ for _, v := range meshRouteDestinationRuleSpec.SubSets {
+ destRuleCfgMap[v.Name] = v.Labels
+ }
+
+ destRuleConfigsMap[meshRouteDestinationRuleSpec.Host] = destRuleCfgMap
+ }
+ }
+
+ routers := make([]*UniformRouter, 0)
+
+ for _, v := range virtualServiceConfigList {
+ tempServiceNeedsDescMap := make(map[string]map[string]string)
+ for _, host := range v.Spec.Hosts {
+ targetDestMap := destRuleConfigsMap[host]
+ mapCopy(tempServiceNeedsDescMap, targetDestMap)
+ }
+ routers = append(routers, NewUniformRouter(v.Spec.Dubbo, tempServiceNeedsDescMap))
}
- logger.Debug("parsed successed! with router size = ", len(routers))
+ logger.Debug("parsed successfully with router size = ", len(routers))
return routers, nil
}
-func mapCombine(dist map[string]map[string]string, from map[string]map[string]string) {
- for k, v := range from {
+func mapCopy(dist map[string]map[string]string, source map[string]map[string]string) {
+ for k, v := range source {
dist[k] = v
}
}
diff --git a/cluster/router/v3router/router_chain_test.go b/cluster/router/v3router/router_chain_test.go
index ac41769..31a824e 100644
--- a/cluster/router/v3router/router_chain_test.go
+++ b/cluster/router/v3router/router_chain_test.go
@@ -30,25 +30,21 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/cluster/router/v3router/k8s_api"
- "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/yaml"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
- "dubbo.apache.org/dubbo-go/v3/protocol"
- "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
const (
mockVSConfigPath = "./test_file/virtual_service.yml"
mockDRConfigPath = "./test_file/dest_rule.yml"
+ mockConfigPath = "./test_file/mesh_route.yml"
)
func TestNewUniformRouterChain(t *testing.T) {
- vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath)
- drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath)
- rc, err := NewUniformRouterChain(vsBytes, drBytes)
- assert.Nil(t, err)
- assert.NotNil(t, rc)
+ //rc, err := NewUniformRouterChain()
+ //assert.Nil(t, err)
+ //assert.NotNil(t, rc)
}
type ruleTestItemStruct struct {
@@ -70,9 +66,9 @@ func TestParseConfigFromFile(t *testing.T) {
routers, err := parseFromConfigToRouters(vsBytes, drBytes)
fmt.Println(routers, err)
assert.Equal(t, len(routers), 1)
- assert.NotNil(t, routers[0].dubboRouter)
- assert.Equal(t, len(routers[0].dubboRouter.uniformRules), 2)
- for i, v := range routers[0].dubboRouter.uniformRules {
+ assert.NotNil(t, routers[0].uniformRules)
+ assert.Equal(t, len(routers[0].uniformRules), 2)
+ for i, v := range routers[0].uniformRules {
if i == 0 {
assert.Equal(t, len(v.services), 2)
assert.Equal(t, "com.taobao.hsf.demoService:1.0.0", v.services[0].Exact)
@@ -192,23 +188,21 @@ func TestParseConfigFromFile(t *testing.T) {
}
func TestRouterChain_Route(t *testing.T) {
- vsBytes, _ := yaml.LoadYMLConfig(mockVSConfigPath)
- drBytes, _ := yaml.LoadYMLConfig(mockDRConfigPath)
- rc, err := NewUniformRouterChain(vsBytes, drBytes)
- assert.Nil(t, err)
- assert.NotNil(t, rc)
- newGoodURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
- newBadURL1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
- newBadURL2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
- goodIvk := protocol.NewBaseInvoker(newGoodURL)
- b1 := protocol.NewBaseInvoker(newBadURL1)
- b2 := protocol.NewBaseInvoker(newBadURL2)
- invokerList := make([]protocol.Invoker, 3)
- invokerList = append(invokerList, goodIvk)
- invokerList = append(invokerList, b1)
- invokerList = append(invokerList, b2)
- result := rc.Route(invokerList, newGoodURL, invocation.NewRPCInvocation("GetUser", nil, nil))
- assert.Equal(t, 0, len(result))
+ //rc, err := NewUniformRouterChain()
+ //assert.Nil(t, err)
+ //assert.NotNil(t, rc)
+ //newGoodURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
+ //newBadURL1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
+ //newBadURL2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.6.0")
+ //goodIvk := protocol.NewBaseInvoker(newGoodURL)
+ //b1 := protocol.NewBaseInvoker(newBadURL1)
+ //b2 := protocol.NewBaseInvoker(newBadURL2)
+ //invokerList := make([]protocol.Invoker, 3)
+ //invokerList = append(invokerList, goodIvk)
+ //invokerList = append(invokerList, b1)
+ //invokerList = append(invokerList, b2)
+ //result := rc.Route(invokerList, newGoodURL, invocation.NewRPCInvocation("GetUser", nil, nil))
+ //assert.Equal(t, 0, len(result))
//todo test find target invoker
}
@@ -223,11 +217,12 @@ func TestRouterChain_Process(t *testing.T) {
},
},
}
-
+ vsValue, err := yaml.MarshalYML(mockVirtualServiceConfig)
+ assert.Nil(t, err)
// test virtual service config chage event
mockVirtualServiceChangeEvent := &config_center.ConfigChangeEvent{
Key: k8s_api.VirtualServiceEventKey,
- Value: mockVirtualServiceConfig,
+ Value: string(vsValue),
ConfigType: 0,
}
rc.Process(mockVirtualServiceChangeEvent)
@@ -241,16 +236,12 @@ func TestRouterChain_Process(t *testing.T) {
},
},
}
+ drValue, err := yaml.MarshalYML(mockDestinationRuleConfig)
+ assert.Nil(t, err)
mockDestinationRuleChangeEvent := &config_center.ConfigChangeEvent{
Key: k8s_api.DestinationRuleEventKey,
- Value: mockDestinationRuleConfig,
+ Value: string(drValue),
ConfigType: 0,
}
rc.Process(mockDestinationRuleChangeEvent)
-
- // test unknown event type
- mockUnsupportedEvent := &config_center.ConfigChangeEvent{
- Key: "unknown",
- }
- rc.Process(mockUnsupportedEvent)
}
diff --git a/cluster/router/v3router/uniform_route.go b/cluster/router/v3router/uniform_route.go
index 9ea12dd..8d42cee 100644
--- a/cluster/router/v3router/uniform_route.go
+++ b/cluster/router/v3router/uniform_route.go
@@ -19,6 +19,7 @@ package v3router
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
)
@@ -29,20 +30,33 @@ const (
// UniformRouter have
type UniformRouter struct {
- dubboRouter *DubboRouterRule
+ uniformRules []*UniformRule
}
// NewUniformRouter construct an NewConnCheckRouter via url
-func NewUniformRouter(dubboRouter *DubboRouterRule) (*UniformRouter, error) {
- r := &UniformRouter{
- dubboRouter: dubboRouter,
+func NewUniformRouter(dubboRoutes []*config.DubboRoute, destinationMap map[string]map[string]string) *UniformRouter {
+ uniformRules := make([]*UniformRule, 0)
+ for _, v := range dubboRoutes {
+ uniformRules = append(uniformRules, newUniformRule(v, destinationMap))
+ }
+
+ return &UniformRouter{
+ uniformRules: uniformRules,
}
- return r, nil
}
// Route gets a list of routed invoker
func (r *UniformRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
- return r.dubboRouter.route(invokers, url, invocation)
+ resultInvokers := make([]protocol.Invoker, 0)
+ for _, v := range r.uniformRules {
+ if resultInvokers = v.route(invokers, url, invocation); len(resultInvokers) == 0 {
+ continue
+ }
+ // once there is a uniformRule successfully get target invoker lists, return it
+ return resultInvokers
+ }
+ // return s empty invoker list
+ return resultInvokers
}
// Process there is no process needs for uniform Router, as it upper struct RouterChain has done it
diff --git a/cluster/router/v3router/uniform_rule.go b/cluster/router/v3router/uniform_rule.go
index 3b552b8..4dddc35 100644
--- a/cluster/router/v3router/uniform_rule.go
+++ b/cluster/router/v3router/uniform_rule.go
@@ -38,13 +38,13 @@ import (
// if match, get result destination key, which should be defined in DestinationRule yaml file
type VirtualServiceRule struct {
// routerItem store match router list and destination list of this router
- routerItem *config.DubboServiceRouterItem
+ routerItem *config.DubboRouteDetail
// uniformRule is the upper struct ptr
uniformRule *UniformRule
}
-// match read from vsr's Match config
+// match read from VirtualServiceRule's Match config
// it judges if this invocation matches the router rule request defined in config one by one
func (vsr *VirtualServiceRule) match(url *common.URL, invocation protocol.Invocation) bool {
for _, v := range vsr.routerItem.Match {
@@ -61,7 +61,7 @@ func (vsr *VirtualServiceRule) match(url *common.URL, invocation protocol.Invoca
return false
}
- // atta match judge
+ // attachment match judge
if v.Attachment != nil {
attachmentMatchJudger := judger.NewAttachmentMatchJudger(v.Attachment)
if attachmentMatchJudger.Judge(invocation) {
@@ -198,7 +198,7 @@ func (vsr *VirtualServiceRule) getRuleTargetInvokers(invokers []protocol.Invoker
return weightInvokerPairResult.getTargetInvokers(), nil
}
-// UniformRule
+// UniformRule uniform rule
type UniformRule struct {
services []*config.StringMatch
virtualServiceRules []VirtualServiceRule
@@ -206,21 +206,22 @@ type UniformRule struct {
}
// NewDefaultConnChecker constructs a new DefaultConnChecker based on the url
-func newUniformRule(dubboRoute *config.DubboRoute, destinationMap map[string]map[string]string) (*UniformRule, error) {
- matchItems := dubboRoute.RouterDetail
- virtualServiceRules := make([]VirtualServiceRule, 0)
- newUniformRule := &UniformRule{
- DestinationLabelListMap: destinationMap,
+func newUniformRule(dubboRoute *config.DubboRoute, destinationMap map[string]map[string]string) *UniformRule {
+ uniformRule := &UniformRule{
services: dubboRoute.Services,
+ DestinationLabelListMap: destinationMap,
}
- for _, v := range matchItems {
+
+ routeDetail := dubboRoute.RouterDetail
+ virtualServiceRules := make([]VirtualServiceRule, 0)
+ for _, v := range routeDetail {
virtualServiceRules = append(virtualServiceRules, VirtualServiceRule{
routerItem: v,
- uniformRule: newUniformRule,
+ uniformRule: uniformRule,
})
}
- newUniformRule.virtualServiceRules = virtualServiceRules
- return newUniformRule, nil
+ uniformRule.virtualServiceRules = virtualServiceRules
+ return uniformRule
}
func (u *UniformRule) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
diff --git a/common/constant/key.go b/common/constant/key.go
index 5aeb970..d44542d 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -199,7 +199,7 @@ const (
)
const (
- CONFIGURATORS_SUFFIX = ".configurators"
+ ConfiguratorSuffix = ".configurators"
)
const (
@@ -253,6 +253,8 @@ const (
TagRouterRuleSuffix = ".tag-router"
// ConditionRouterRuleSuffix Specify condition router suffix
ConditionRouterRuleSuffix = ".condition-router"
+ // MeshRouteSuffix Specify mesh router suffix
+ MeshRouteSuffix = ".MESHAPPRULE"
// ForceUseTag is the tag in attachment
ForceUseTag = "dubbo.force.tag"
Tagkey = "dubbo.tag"
diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go
index 913e9c5..454fe00 100644
--- a/common/extension/router_factory.go
+++ b/common/extension/router_factory.go
@@ -18,16 +18,11 @@
package extension
import (
- "sync"
-)
-
-import (
"dubbo.apache.org/dubbo-go/v3/cluster/router"
)
var (
- routers = make(map[string]func() router.PriorityRouterFactory)
- fileRouterFactoryOnce sync.Once
+ routers = make(map[string]func() router.PriorityRouterFactory)
)
// SetRouterFactory sets create router factory function with @name
diff --git a/config/config_center_config.go b/config/config_center_config.go
index 025ca0e..956a39d 100644
--- a/config/config_center_config.go
+++ b/config/config_center_config.go
@@ -60,6 +60,8 @@ type CenterConfig struct {
AppID string `default:"dubbo" yaml:"app-id" json:"app-id,omitempty"`
Timeout string `default:"10s" yaml:"timeout" json:"timeout,omitempty"`
Params map[string]string `yaml:"params" json:"parameters,omitempty"`
+
+ DynamicConfiguration config_center.DynamicConfiguration
}
// Prefix dubbo.config-center
@@ -129,11 +131,7 @@ func (c *CenterConfig) toURL() (*common.URL, error) {
// it will prepare the environment
func startConfigCenter(rc *RootConfig) error {
cc := rc.ConfigCenter
- configCenterUrl, err := cc.toURL()
- if err != nil {
- return err
- }
- strConf, err := cc.prepareEnvironment(configCenterUrl)
+ strConf, err := cc.prepareEnvironment()
if err != nil {
return errors.WithMessagef(err, "start config center error!")
}
@@ -150,7 +148,7 @@ func startConfigCenter(rc *RootConfig) error {
return nil
}
-func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfiguration, error) {
+func (c *CenterConfig) CreateDynamicConfiguration() (config_center.DynamicConfiguration, error) {
configCenterUrl, err := c.toURL()
if err != nil {
return nil, err
@@ -162,14 +160,23 @@ func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfigura
return factory.GetDynamicConfiguration(configCenterUrl)
}
-func (c *CenterConfig) prepareEnvironment(configCenterUrl *common.URL) (string, error) {
- factory := extension.GetConfigCenterFactory(configCenterUrl.Protocol)
- if factory == nil {
- return "", errors.New("get config center factory failed")
+func (c *CenterConfig) GetDynamicConfiguration() (config_center.DynamicConfiguration, error) {
+ if c.DynamicConfiguration != nil {
+ return c.DynamicConfiguration, nil
+ }
+ dynamicConfig, err := c.CreateDynamicConfiguration()
+ if err != nil {
+ logger.Errorf("Create dynamic configuration error , error message is %v", err)
+ return nil, errors.WithStack(err)
}
- dynamicConfig, err := factory.GetDynamicConfiguration(configCenterUrl)
+ c.DynamicConfiguration = dynamicConfig
+ return dynamicConfig, nil
+}
+
+func (c *CenterConfig) prepareEnvironment() (string, error) {
+ dynamicConfig, err := c.GetDynamicConfiguration()
if err != nil {
- logger.Errorf("Get dynamic configuration error , error message is %v", err)
+ logger.Errorf("Create dynamic configuration error , error message is %v", err)
return "", errors.WithStack(err)
}
envInstance := conf.GetEnvInstance()
diff --git a/config/uniform_router_config.go b/config/uniform_router_config.go
index 3d6f728..b96a14b 100644
--- a/config/uniform_router_config.go
+++ b/config/uniform_router_config.go
@@ -24,12 +24,22 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)
+type MeshRouteMetadata struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata,omitempty"`
+
+ YamlAPIVersion string `yaml:"apiVersion" `
+ YamlKind string `yaml:"kind" `
+ MetaData MetaDataStruct `yaml:"metadata"`
+ Spec interface{}
+}
+
// nolint
type MetaDataStruct struct {
Name string `yaml:"name" json:"name"`
}
-// VirtualService Config Definition
+// VirtualServiceConfig Config Definition
type VirtualServiceConfig struct {
YamlAPIVersion string `yaml:"apiVersion"`
YamlKind string `yaml:"kind"`
@@ -47,12 +57,13 @@ type UniformRouterConfigSpec struct {
// nolint
type DubboRoute struct {
- Services []*StringMatch `yaml:"services" json:"service"`
- RouterDetail []*DubboServiceRouterItem `yaml:"routedetail" json:"routedetail"`
+ Name string `yaml:"name" json:"name"`
+ Services []*StringMatch `yaml:"services" json:"service"`
+ RouterDetail []*DubboRouteDetail `yaml:"routedetail" json:"routedetail"`
}
// nolint
-type DubboServiceRouterItem struct {
+type DubboRouteDetail struct {
Name string `yaml:"name" json:"name"`
Match []*DubboMatchRequest `yaml:"match" json:"match"`
Router []*DubboDestination `yaml:"route" json:"route"`
@@ -146,7 +157,7 @@ type RouterDest struct {
// todo port
}
-// DestinationRule Definition
+// DestinationRuleConfig Definition
type DestinationRuleConfig struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
diff --git a/config_center/mock_dynamic_config.go b/config_center/mock_dynamic_config.go
index f42b1dd..0abf808 100644
--- a/config_center/mock_dynamic_config.go
+++ b/config_center/mock_dynamic_config.go
@@ -168,7 +168,7 @@ func (c *MockDynamicConfiguration) MockServiceConfigEvent() {
},
}
value, _ := yaml.Marshal(config)
- key := "group*" + mockServiceName + ":1.0.0" + constant.CONFIGURATORS_SUFFIX
+ key := "group*" + mockServiceName + ":1.0.0" + constant.ConfiguratorSuffix
c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd})
}
@@ -191,6 +191,6 @@ func (c *MockDynamicConfiguration) MockApplicationConfigEvent() {
},
}
value, _ := yaml.Marshal(config)
- key := "test-application" + constant.CONFIGURATORS_SUFFIX
+ key := "test-application" + constant.ConfiguratorSuffix
c.listener[key].Process(&ConfigChangeEvent{Key: key, Value: string(value), ConfigType: remoting.EventTypeAdd})
}
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index b85d28a..c5eb36c 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -19,6 +19,7 @@ package zookeeper
import (
"encoding/base64"
+ "strconv"
"strings"
"sync"
)
@@ -34,15 +35,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/config_center/parser"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)
const (
- // ZkClient
- // zookeeper client name
- ZkClient = "zk config_center"
pathSeparator = "/"
)
@@ -59,6 +58,8 @@ type zookeeperDynamicConfiguration struct {
listener *zookeeper.ZkEventListener
cacheListener *CacheListener
parser parser.ConfigurationParser
+
+ base64Enabled bool
}
func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
@@ -66,6 +67,14 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
url: url,
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
}
+ if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
+ base64Enabled, err := strconv.ParseBool(v)
+ if err != nil {
+ panic("value of base64 must be bool, error=" + err.Error())
+ }
+ c.base64Enabled = base64Enabled
+ }
+
err := zookeeper.ValidateZookeeperClient(c, url.Location)
if err != nil {
logger.Errorf("zookeeper client start error ,error message is %v", err)
@@ -102,7 +111,6 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
if len(tmpOpts.Group) != 0 {
key = tmpOpts.Group + "/" + key
} else {
-
/**
* when group is null, we are fetching governance rules, for example:
* 1. key=org.apache.dubbo.DemoService.configurators
@@ -115,6 +123,10 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
if err != nil {
return "", perrors.WithStack(err)
}
+ if !c.base64Enabled {
+ return string(content), nil
+ }
+
decoded, err := base64.StdEncoding.DecodeString(string(content))
if err != nil {
return "", perrors.WithStack(err)
@@ -130,9 +142,11 @@ func (c *zookeeperDynamicConfiguration) GetInternalProperty(key string, opts ...
// PublishConfig will put the value into Zk with specific path
func (c *zookeeperDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := c.getPath(key, group)
- strbytes := []byte(value)
- encoded := base64.StdEncoding.EncodeToString(strbytes)
- err := c.client.CreateWithValue(path, []byte(encoded))
+ valueBytes := []byte(value)
+ if c.base64Enabled {
+ valueBytes = []byte(base64.StdEncoding.EncodeToString(valueBytes))
+ }
+ err := c.client.CreateWithValue(path, valueBytes)
if err != nil {
return perrors.WithStack(err)
}
diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go
index 9be9233..7b67a70 100644
--- a/config_center/zookeeper/listener.go
+++ b/config_center/zookeeper/listener.go
@@ -24,6 +24,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
)
@@ -65,6 +66,10 @@ func (l *CacheListener) DataChange(event remoting.Event) bool {
return true
}
key := l.pathToKey(event.Path)
+ // TODO use common way
+ if strings.HasSuffix(key, constant.MeshRouteSuffix) {
+ key = key[:strings.Index(key, constant.MeshRouteSuffix)]
+ }
if key != "" {
if listeners, ok := l.keyListeners.Load(key); ok {
for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
@@ -78,11 +83,13 @@ func (l *CacheListener) DataChange(event remoting.Event) bool {
func (l *CacheListener) pathToKey(path string) string {
key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
- if strings.HasSuffix(key, constant.CONFIGURATORS_SUFFIX) ||
+ if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
- strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
+ strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) ||
+ strings.HasSuffix(key, constant.MeshRouteSuffix) {
// governance config, so we remove the "dubbo." prefix
- return key[strings.Index(key, ".")+1:]
+ key = key[strings.Index(key, ".")+1:]
}
+ logger.Debugf("pathToKey path:%s, key:%s\n", path, key)
return key
}
diff --git a/imports/imports.go b/imports/imports.go
index 078be19..02c685e 100644
--- a/imports/imports.go
+++ b/imports/imports.go
@@ -30,6 +30,7 @@ import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random"
_ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin"
+ _ "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router"
_ "dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory"
_ "dubbo.apache.org/dubbo-go/v3/config_center/apollo"
_ "dubbo.apache.org/dubbo-go/v3/config_center/nacos"
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index b8a4b67..bec0334 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -85,7 +85,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director
dir.consumerURL = dir.getConsumerUrl(url.SubURL)
- if routerChain, err := chain.NewRouterChain(dir.consumerURL); err == nil {
+ if routerChain, err := chain.NewRouterChain(); err == nil {
dir.Directory.SetRouterChain(routerChain)
} else {
logger.Warnf("fail to create router chain with url: %s, err is: %v", url.SubURL, err)
@@ -465,7 +465,7 @@ type referenceConfigurationListener struct {
func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) *referenceConfigurationListener {
listener := &referenceConfigurationListener{directory: dir, url: url}
listener.InitWith(
- url.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
+ url.EncodedServiceKey()+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
@@ -489,7 +489,7 @@ func newConsumerConfigurationListener(dir *RegistryDirectory) *consumerConfigura
listener := &consumerConfigurationListener{directory: dir}
application := config.GetRootConfig().Application
listener.InitWith(
- application.Name+constant.CONFIGURATORS_SUFFIX,
+ application.Name+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index add0c5d..01dbdf2 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -461,7 +461,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf
listener := &providerConfigurationListener{}
listener.overrideListeners = overrideListeners
listener.InitWith(
- config.GetRootConfig().Application.Name+constant.CONFIGURATORS_SUFFIX,
+ config.GetRootConfig().Application.Name+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
@@ -486,7 +486,7 @@ type serviceConfigurationListener struct {
func newServiceConfigurationListener(overrideListener *overrideSubscribeListener, providerUrl *common.URL) *serviceConfigurationListener {
listener := &serviceConfigurationListener{overrideListener: overrideListener, providerUrl: providerUrl}
listener.InitWith(
- providerUrl.EncodedServiceKey()+constant.CONFIGURATORS_SUFFIX,
+ providerUrl.EncodedServiceKey()+constant.ConfiguratorSuffix,
listener,
extension.GetDefaultConfiguratorFunc(),
)
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 1aef932..34d6c13 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -152,6 +152,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
+ // FIXME always false
if err == errNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
@@ -240,7 +241,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
- logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
+ logger.Debugf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
@@ -264,10 +265,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
continue
case <-l.exit:
l.client.UnregisterEvent(zkPath, &event)
- logger.Warnf("listen(path{%s}) goroutine exit now...", zkPath)
+ logger.Debugf("listen(path{%s}) goroutine exit now...", zkPath)
return
case <-event:
- logger.Infof("get zk.EventNodeDataChange notify event")
+ logger.Debugf("get zk.EventNodeDataChange notify event")
l.client.UnregisterEvent(zkPath, &event)
l.handleZkNodeEvent(zkPath, nil, listener)
continue
@@ -358,7 +359,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
ticker = time.NewTicker(tickerTTL)
}
case zkEvent = <-childEventCh:
- logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
+ logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {