You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/04/30 09:53:48 UTC
[dubbo-go] branch 3.0 updated: xds ring hash (#1828)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 8b3b71ec2 xds ring hash (#1828)
8b3b71ec2 is described below
commit 8b3b71ec21d84574d8b3297694b81cd77f81b53b
Author: binbin.zhang <bb...@163.com>
AuthorDate: Sat Apr 30 17:53:43 2022 +0800
xds ring hash (#1828)
* xds ring hash
* refine
---
cluster/loadbalance/ringhash/ring.go | 174 ++++++++++++++++++++++++++
cluster/loadbalance/ringhash/ringhash.go | 87 +++++++++++++
cluster/loadbalance/ringhash/ringhash_test.go | 98 +++++++++++++++
cluster/router/meshrouter/meshrouter.go | 83 ++++--------
common/constant/loadbalance.go | 1 +
common/constant/xds.go | 4 +
remoting/xds/client.go | 26 ++++
remoting/xds/ewatcher/ewatcher.go | 2 +
remoting/xds/mocks/client.go | 76 +++++++++++
9 files changed, 489 insertions(+), 62 deletions(-)
diff --git a/cluster/loadbalance/ringhash/ring.go b/cluster/loadbalance/ringhash/ring.go
new file mode 100644
index 000000000..e0b756666
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ring.go
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+ "fmt"
+ "math"
+ "math/bits"
+ "sort"
+ "strconv"
+)
+
+import (
+ "github.com/cespare/xxhash/v2"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+ "dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand"
+)
+
+type invokerWithWeight struct {
+ invoker protocol.Invoker
+ weight float64
+}
+
+type ringEntry struct {
+ idx int
+ hash uint64
+ invoker protocol.Invoker
+}
+
+func (lb *ringhashLoadBalance) generateRing(invokers []invokerWrapper, minRingSize, maxRingSize uint64) ([]*ringEntry, error) {
+ normalizedWeights, minWeight, err := normalizeWeights(invokers)
+ if err != nil {
+ return nil, err
+ }
+ // Normalized weights for {3,3,4} is {0.3,0.3,0.4}.
+
+ // Scale up the size of the ring such that the least-weighted host gets a
+ // whole number of hashes on the ring.
+ //
+ // Note that size is limited by the input max/min.
+ scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize))
+ ringSize := math.Ceil(scale)
+ items := make([]*ringEntry, 0, int(ringSize))
+
+ // For each entry, scale*weight nodes are generated in the ring.
+ //
+ // Not all of these are whole numbers. E.g. for weights {a:3,b:3,c:4}, if
+ // ring size is 7, scale is 6.66. The numbers of nodes will be
+ // {a,a,b,b,c,c,c}.
+ //
+ // A hash is generated for each item, and later the results will be sorted
+ // based on the hash.
+ var (
+ idx int
+ targetIdx float64
+ )
+ for _, inw := range normalizedWeights {
+ targetIdx += scale * inw.weight
+ for float64(idx) < targetIdx {
+ h := xxhash.Sum64String(inw.invoker.GetURL().String() + strconv.Itoa(len(items)))
+ items = append(items, &ringEntry{idx: idx, hash: h, invoker: inw.invoker})
+ idx++
+ }
+ }
+
+ // Sort items based on hash, to prepare for binary search.
+ sort.Slice(items, func(i, j int) bool { return items[i].hash < items[j].hash })
+ for i, ii := range items {
+ ii.idx = i
+ }
+ return items, nil
+}
+
+// normalizeWeights divides all the weights by the sum, so that the total weight
+// is 1.
+func normalizeWeights(invokers []invokerWrapper) ([]invokerWithWeight, float64, error) {
+ var weightSum int
+ for _, v := range invokers {
+ weightSum += v.weight
+ }
+ if weightSum == 0 {
+ return nil, 0, fmt.Errorf("total weight of all endpoints is 0")
+ }
+ weightSumF := float64(weightSum)
+ ret := make([]invokerWithWeight, 0, len(invokers))
+ min := math.MaxFloat64
+ for _, invoker := range invokers {
+ nw := float64(invoker.weight) / weightSumF
+ ret = append(ret, invokerWithWeight{invoker: invoker.invoker, weight: nw})
+ if nw < min {
+ min = nw
+ }
+ }
+ return ret, min, nil
+}
+
+func (lb *ringhashLoadBalance) pick(h uint64, items []*ringEntry) *ringEntry {
+ i := sort.Search(len(items), func(i int) bool { return items[i].hash >= h })
+ if i == len(items) {
+ // If not found, and h is greater than the largest hash, return the
+ // first item.
+ i = 0
+ }
+ return items[i]
+}
+
+func (lb *ringhashLoadBalance) generateHash(invocation protocol.Invocation, hashPolicies []*resource.HashPolicy) uint64 {
+ var (
+ hash uint64
+ generatedHash bool
+ )
+ for _, policy := range hashPolicies {
+ var (
+ policyHash uint64
+ generatedPolicyHash bool
+ )
+ switch policy.HashPolicyType {
+ case resource.HashPolicyTypeHeader:
+ value, ok := invocation.GetAttachment(policy.HeaderName)
+ if !ok || len(value) == 0 {
+ continue
+ }
+ policyHash = xxhash.Sum64String(value)
+ generatedHash = true
+ generatedPolicyHash = true
+ case resource.HashPolicyTypeChannelID:
+ // Hash the ClientConn pointer which logically uniquely
+ // identifies the client.
+ policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &lb.client))
+ generatedHash = true
+ generatedPolicyHash = true
+ }
+
+ // Deterministically combine the hash policies. Rotating prevents
+ // duplicate hash policies from canceling each other out and preserves
+ // the 64 bits of entropy.
+ if generatedPolicyHash {
+ hash = bits.RotateLeft64(hash, 1)
+ hash = hash ^ policyHash
+ }
+
+ // If terminal policy and a hash has already been generated, ignore the
+ // rest of the policies and use that hash already generated.
+ if policy.Terminal && generatedHash {
+ break
+ }
+ }
+
+ if generatedHash {
+ return hash
+ }
+ // If no generated hash return a random long. In the grand scheme of things
+ // this logically will map to choosing a random backend to route request to.
+ return grpcrand.Uint64()
+}
diff --git a/cluster/loadbalance/ringhash/ringhash.go b/cluster/loadbalance/ringhash/ringhash.go
new file mode 100644
index 000000000..3bd5f085f
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ringhash.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+ "strconv"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/remoting/xds"
+)
+
+func init() {
+ extension.SetLoadbalance(constant.LoadXDSRingHash, newRingHashLoadBalance)
+}
+
+type invokerWrapper struct {
+ invoker protocol.Invoker
+ weight int
+}
+
+type ringhashLoadBalance struct {
+ client xds.XDSWrapperClient
+}
+
+// newRingHashLoadBalance xds ring hash
+//
+// The same parameters of the request is always sent to the same provider.
+func newRingHashLoadBalance() loadbalance.LoadBalance {
+ return &ringhashLoadBalance{client: xds.GetXDSWrappedClient()}
+}
+
+// Select gets invoker based on load balancing strategy
+func (lb *ringhashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+ url := invocation.Invoker().GetURL()
+ serviceUniqueKey := common.GetSubscribeName(url)
+ hostAddr, err := lb.client.GetHostAddrByServiceUniqueKey(serviceUniqueKey)
+ if err != nil {
+ logger.Errorf("[xds ringhash] GetHostAddrByServiceUniqueKey failed,error=%v", err)
+ return nil
+ }
+ clusterUpdate := lb.client.GetClusterUpdateIgnoreVersion(hostAddr)
+ policy := clusterUpdate.LBPolicy
+ if policy.MaximumRingSize < policy.MinimumRingSize {
+ logger.Errorf("[xds ringhash] ringsize parameter is invalid. MinimumRingSize=%d,MaximumRingSize=%d", policy.MaximumRingSize, policy.MaximumRingSize)
+ return nil
+ }
+ invokerWrappers := make([]invokerWrapper, 0, len(invokers))
+ for _, v := range invokers {
+ weight, _ := strconv.Atoi(v.GetURL().GetParam(constant.EndPointWeight, "1"))
+ invokerWrappers = append(invokerWrappers, invokerWrapper{invoker: v, weight: weight})
+ }
+ ring, err := lb.generateRing(invokerWrappers, policy.MinimumRingSize, policy.MaximumRingSize)
+ if err != nil {
+ logger.Errorf("[xds ringhash] ringsize parameter is invalid. MinimumRingSize=%d,MaximumRingSize=%d", policy.MaximumRingSize, policy.MaximumRingSize)
+ return nil
+ }
+
+ routerConfig := lb.client.GetRouterConfig(hostAddr)
+ router, err := lb.client.MatchRoute(routerConfig, invocation)
+ if err != nil {
+ logger.Errorf("[xds ringhash] not found route,method=%s", invocation.MethodName())
+ return nil
+ }
+ return lb.pick(lb.generateHash(invocation, router.HashPolicies), ring).invoker
+}
diff --git a/cluster/loadbalance/ringhash/ringhash_test.go b/cluster/loadbalance/ringhash/ringhash_test.go
new file mode 100644
index 000000000..9bef817be
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ringhash_test.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+ "errors"
+ "fmt"
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "dubbo.apache.org/dubbo-go/v3/remoting/xds/mocks"
+ "dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+)
+
+func TestSelect(t *testing.T) {
+ var invokers []protocol.Invoker
+ url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+ "192.168.1.1", constant.DefaultPort))
+ url.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+ url.SetParam(constant.EndPointWeight, "3")
+ invokers = append(invokers, protocol.NewBaseInvoker(url))
+ url1, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+ "192.168.1.2", constant.DefaultPort))
+ url1.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+ url1.SetParam(constant.EndPointWeight, "3")
+ invokers = append(invokers, protocol.NewBaseInvoker(url1))
+ url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+ "192.168.1.3", constant.DefaultPort))
+ url2.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+ url2.SetParam(constant.EndPointWeight, "4")
+ invokers = append(invokers, protocol.NewBaseInvoker(url2))
+ t.Run("normal", func(t *testing.T) {
+ mockXDSClient := &mocks.WrappedClientMock{}
+ mockXDSClient.On("GetRouterConfig", mock.Anything).Return(resource.RouteConfigUpdate{})
+ mockXDSClient.On("GetClusterUpdateIgnoreVersion", mock.Anything).
+ Return(resource.ClusterUpdate{LBPolicy: &resource.ClusterLBPolicyRingHash{MinimumRingSize: 9, MaximumRingSize: 10}})
+ mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).Return("HelloService", nil)
+ mockXDSClient.On("MatchRoute", mock.Anything, mock.Anything).
+ Return(&resource.Route{HashPolicies: []*resource.HashPolicy{{HashPolicyType: resource.HashPolicyTypeHeader,
+ HeaderName: "key"}}}, nil)
+ lb := &ringhashLoadBalance{client: mockXDSClient}
+ inv := &invocation.RPCInvocation{}
+ inv.SetInvoker(protocol.NewBaseInvoker(url))
+ inv.SetAttachment("key", "1")
+ assert.NotNil(t, lb.Select(invokers, inv))
+ })
+
+ t.Run("normal_hash_channelID", func(t *testing.T) {
+ mockXDSClient := &mocks.WrappedClientMock{}
+ mockXDSClient.On("GetRouterConfig", mock.Anything).Return(resource.RouteConfigUpdate{})
+ mockXDSClient.On("GetClusterUpdateIgnoreVersion", mock.Anything).
+ Return(resource.ClusterUpdate{LBPolicy: &resource.ClusterLBPolicyRingHash{MinimumRingSize: 9, MaximumRingSize: 10}})
+ mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).Return("HelloService", nil)
+ mockXDSClient.On("MatchRoute", mock.Anything, mock.Anything).
+ Return(&resource.Route{HashPolicies: []*resource.HashPolicy{
+ {HashPolicyType: resource.HashPolicyTypeChannelID}}}, nil)
+ lb := &ringhashLoadBalance{client: mockXDSClient}
+ inv := &invocation.RPCInvocation{}
+ inv.SetInvoker(protocol.NewBaseInvoker(url))
+ assert.NotNil(t, lb.Select(invokers, inv))
+ })
+
+ t.Run("GetHostAddrByServiceUniqueKey_faild", func(t *testing.T) {
+ mockXDSClient := &mocks.WrappedClientMock{}
+ mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).
+ Return("HelloService", errors.New("error"))
+ lb := &ringhashLoadBalance{client: mockXDSClient}
+ inv := &invocation.RPCInvocation{}
+ inv.SetInvoker(protocol.NewBaseInvoker(url))
+ inv.SetAttachment("key", "1")
+ assert.Nil(t, lb.Select(invokers, inv))
+ })
+}
diff --git a/cluster/router/meshrouter/meshrouter.go b/cluster/router/meshrouter/meshrouter.go
index 102a48ac7..567f58a14 100644
--- a/cluster/router/meshrouter/meshrouter.go
+++ b/cluster/router/meshrouter/meshrouter.go
@@ -18,9 +18,7 @@
package meshrouter
import (
- "bytes"
"math/rand"
- "strings"
)
import (
@@ -31,8 +29,6 @@ import (
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting/xds"
- "dubbo.apache.org/dubbo-go/v3/xds/client/resource"
- "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
)
const (
@@ -60,7 +56,7 @@ func (r *MeshRouter) Route(invokers []protocol.Invoker, url *common.URL, invocat
if r.client == nil {
return invokers
}
- hostAddr, err := r.client.GetHostAddrByServiceUniqueKey(getSubscribeName(url))
+ hostAddr, err := r.client.GetHostAddrByServiceUniqueKey(common.GetSubscribeName(url))
if err != nil {
// todo deal with error
return nil
@@ -75,47 +71,28 @@ func (r *MeshRouter) Route(invokers []protocol.Invoker, url *common.URL, invocat
}
clusterInvokerMap[meshClusterID] = append(clusterInvokerMap[meshClusterID], v)
}
+ route, err := r.client.MatchRoute(rconf, invocation)
+ if err != nil {
+ logger.Errorf("[Mesh Router] not found route,method=%s", invocation.MethodName())
+ return nil
+ }
- if len(rconf.VirtualHosts) != 0 {
- // try to route to sub virtual host
- for _, vh := range rconf.VirtualHosts {
- // 1. match domain
- //vh.Domains == ["*"]
-
- // 2. match http route
- for _, r := range vh.Routes {
- //route.
- ctx := invocation.GetAttachmentAsContext()
- matcher, err := resource.RouteToMatcher(r)
- if err != nil {
- logger.Errorf("[Mesh Router] router to matcher failed with error %s", err)
- return invokers
- }
- if matcher.Match(resolver.RPCInfo{
- Context: ctx,
- Method: "/" + invocation.MethodName(),
- }) {
- // Loop through routes in order and select first match.
- if r == nil || r.WeightedClusters == nil {
- logger.Errorf("[Mesh Router] route's WeightedClusters is empty, route: %+v", r)
- return invokers
- }
- invokersWeightPairs := make(invokerWeightPairs, 0)
-
- for clusterID, weight := range r.WeightedClusters {
- // cluster -> invokers
- targetInvokers := clusterInvokerMap[clusterID]
- invokersWeightPairs = append(invokersWeightPairs, invokerWeightPair{
- invokers: targetInvokers,
- weight: weight.Weight,
- })
- }
- return invokersWeightPairs.GetInvokers()
- }
- }
- }
+ // Loop through routes in order and select first match.
+ if route == nil || route.WeightedClusters == nil {
+ logger.Errorf("[Mesh Router] route's WeightedClusters is empty, route: %+v", r)
+ return invokers
+ }
+ invokersWeightPairs := make(invokerWeightPairs, 0)
+
+ for clusterID, weight := range route.WeightedClusters {
+ // cluster -> invokers
+ targetInvokers := clusterInvokerMap[clusterID]
+ invokersWeightPairs = append(invokersWeightPairs, invokerWeightPair{
+ invokers: targetInvokers,
+ weight: weight.Weight,
+ })
}
- return invokers
+ return invokersWeightPairs.GetInvokers()
}
// Process there is no process needs for uniform Router, as it upper struct RouterChain has done it
@@ -167,21 +144,3 @@ func (i *invokerWeightPairs) GetInvokers() []protocol.Invoker {
}
return (*i)[0].invokers
}
-
-func getSubscribeName(url *common.URL) string {
- var buffer bytes.Buffer
-
- buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
- appendParam(&buffer, url, constant.InterfaceKey)
- appendParam(&buffer, url, constant.VersionKey)
- appendParam(&buffer, url, constant.GroupKey)
- return buffer.String()
-}
-
-func appendParam(target *bytes.Buffer, url *common.URL, key string) {
- value := url.GetParam(key, "")
- target.Write([]byte(constant.NacosServiceNameSeparator))
- if strings.TrimSpace(value) != "" {
- target.Write([]byte(value))
- }
-}
diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go
index 82c5771f9..dde844337 100644
--- a/common/constant/loadbalance.go
+++ b/common/constant/loadbalance.go
@@ -23,4 +23,5 @@ const (
LoadBalanceKeyRandom = "random"
LoadBalanceKeyRoundRobin = "roundrobin"
LoadBalanceKeyP2C = "p2c"
+ LoadXDSRingHash = "xdsringhash"
)
diff --git a/common/constant/xds.go b/common/constant/xds.go
index 5c92d0ab4..62903b874 100644
--- a/common/constant/xds.go
+++ b/common/constant/xds.go
@@ -17,6 +17,10 @@
package constant
+const (
+ EndPointWeight = "endPointWeight"
+)
+
const (
MeshClusterIDKey = "meshClusterID"
MeshHostAddrKey = "meshHostAddr"
diff --git a/remoting/xds/client.go b/remoting/xds/client.go
index 448744eb7..7c23f5743 100644
--- a/remoting/xds/client.go
+++ b/remoting/xds/client.go
@@ -18,6 +18,7 @@
package xds
import (
+ "errors"
"sync"
"time"
)
@@ -28,12 +29,14 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
"dubbo.apache.org/dubbo-go/v3/remoting/xds/ewatcher"
"dubbo.apache.org/dubbo-go/v3/remoting/xds/mapping"
"dubbo.apache.org/dubbo-go/v3/xds/client"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+ "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
)
const (
@@ -498,6 +501,28 @@ func (w *WrappedClientImpl) getAllVersionClusterName(hostAddr string) []string {
return allVersionClusterNames
}
+func (w *WrappedClientImpl) MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error) {
+ ctx := invocation.GetAttachmentAsContext()
+ rpcInfo := resolver.RPCInfo{
+ Context: ctx,
+ Method: "/" + invocation.MethodName(),
+ }
+ // try to route to sub virtual host
+ for _, vh := range routerConfig.VirtualHosts {
+ for _, r := range vh.Routes {
+ //route.
+ matcher, err := resource.RouteToMatcher(r)
+ if err != nil {
+ return nil, err
+ }
+ if matcher.Match(rpcInfo) {
+ return r, nil
+ }
+ }
+ }
+ return nil, errors.New("not found route")
+}
+
type XDSWrapperClient interface {
Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error
UnSubscribe(svcUniqueName string)
@@ -507,4 +532,5 @@ type XDSWrapperClient interface {
GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate
GetHostAddress() xdsCommon.HostAddr
GetIstioPodIP() string
+ MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error)
}
diff --git a/remoting/xds/ewatcher/ewatcher.go b/remoting/xds/ewatcher/ewatcher.go
index 06a2a4f20..304c7daae 100644
--- a/remoting/xds/ewatcher/ewatcher.go
+++ b/remoting/xds/ewatcher/ewatcher.go
@@ -19,6 +19,7 @@ package ewatcher
import (
"fmt"
+ "strconv"
"sync"
)
@@ -112,6 +113,7 @@ func generateRegistryEvent(clusterID string, endpoint resource.Endpoint, interfa
url.AddParam(constant.MeshSubsetKey, cluster.Subset)
url.AddParam(constant.MeshClusterIDKey, clusterID)
url.AddParam(constant.MeshHostAddrKey, cluster.Addr.String())
+ url.AddParam(constant.EndPointWeight, strconv.Itoa(int(endpoint.Weight)))
if endpoint.HealthStatus == resource.EndpointHealthStatusUnhealthy {
return ®istry.ServiceEvent{
Action: remoting.EventTypeDel,
diff --git a/remoting/xds/mocks/client.go b/remoting/xds/mocks/client.go
new file mode 100644
index 000000000..02c7f93b9
--- /dev/null
+++ b/remoting/xds/mocks/client.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+ "github.com/stretchr/testify/mock"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/registry"
+ xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
+ "dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+)
+
+// WrappedClientMock is an autogenerated mock type for the WrappedClientMock type
+type WrappedClientMock struct {
+ mock.Mock
+}
+
+func (m *WrappedClientMock) Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error {
+ args := m.Called(svcUniqueName, interfaceName, hostAddr, lst)
+ return args.Error(0)
+}
+
+func (m *WrappedClientMock) UnSubscribe(svcUniqueName string) {
+
+}
+
+func (m *WrappedClientMock) GetRouterConfig(hostAddr string) resource.RouteConfigUpdate {
+ args := m.Called(hostAddr)
+ return args.Get(0).(resource.RouteConfigUpdate)
+}
+func (m *WrappedClientMock) GetHostAddrByServiceUniqueKey(serviceUniqueKey string) (string, error) {
+ args := m.Called(serviceUniqueKey)
+ return args.String(0), args.Error(1)
+}
+func (m *WrappedClientMock) ChangeInterfaceMap(serviceUniqueKey string, add bool) error {
+ args := m.Called(serviceUniqueKey, add)
+ return args.Error(0)
+}
+
+func (m *WrappedClientMock) GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate {
+ args := m.Called(hostAddr)
+ return args.Get(0).(resource.ClusterUpdate)
+}
+
+func (m *WrappedClientMock) GetHostAddress() xdsCommon.HostAddr {
+ args := m.Called()
+ return args.Get(0).(xdsCommon.HostAddr)
+}
+
+func (m *WrappedClientMock) GetIstioPodIP() string {
+ args := m.Called()
+ return args.String(0)
+}
+
+func (m *WrappedClientMock) MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error) {
+ args := m.Called(routerConfig, invocation)
+ return args.Get(0).(*resource.Route), args.Error(1)
+}