You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/04/30 09:53:48 UTC

[dubbo-go] branch 3.0 updated: xds ring hash (#1828)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 8b3b71ec2 xds ring hash (#1828)
8b3b71ec2 is described below

commit 8b3b71ec21d84574d8b3297694b81cd77f81b53b
Author: binbin.zhang <bb...@163.com>
AuthorDate: Sat Apr 30 17:53:43 2022 +0800

    xds ring hash (#1828)
    
    * xds ring hash
    
    * refine
---
 cluster/loadbalance/ringhash/ring.go          | 174 ++++++++++++++++++++++++++
 cluster/loadbalance/ringhash/ringhash.go      |  87 +++++++++++++
 cluster/loadbalance/ringhash/ringhash_test.go |  98 +++++++++++++++
 cluster/router/meshrouter/meshrouter.go       |  83 ++++--------
 common/constant/loadbalance.go                |   1 +
 common/constant/xds.go                        |   4 +
 remoting/xds/client.go                        |  26 ++++
 remoting/xds/ewatcher/ewatcher.go             |   2 +
 remoting/xds/mocks/client.go                  |  76 +++++++++++
 9 files changed, 489 insertions(+), 62 deletions(-)

diff --git a/cluster/loadbalance/ringhash/ring.go b/cluster/loadbalance/ringhash/ring.go
new file mode 100644
index 000000000..e0b756666
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ring.go
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+	"fmt"
+	"math"
+	"math/bits"
+	"sort"
+	"strconv"
+)
+
+import (
+	"github.com/cespare/xxhash/v2"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+	"dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand"
+)
+
+type invokerWithWeight struct {
+	invoker protocol.Invoker
+	weight  float64
+}
+
+type ringEntry struct {
+	idx     int
+	hash    uint64
+	invoker protocol.Invoker
+}
+
+func (lb *ringhashLoadBalance) generateRing(invokers []invokerWrapper, minRingSize, maxRingSize uint64) ([]*ringEntry, error) {
+	normalizedWeights, minWeight, err := normalizeWeights(invokers)
+	if err != nil {
+		return nil, err
+	}
+	// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.
+
+	// Scale up the size of the ring such that the least-weighted host gets a
+	// whole number of hashes on the ring.
+	//
+	// Note that size is limited by the input max/min.
+	scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize))
+	ringSize := math.Ceil(scale)
+	items := make([]*ringEntry, 0, int(ringSize))
+
+	// For each entry, scale*weight nodes are generated in the ring.
+	//
+	// Not all of these are whole numbers. E.g. for weights {a:3,b:3,c:4}, if
+	// ring size is 7, scale is 6.66. The numbers of nodes will be
+	// {a,a,b,b,c,c,c}.
+	//
+	// A hash is generated for each item, and later the results will be sorted
+	// based on the hash.
+	var (
+		idx       int
+		targetIdx float64
+	)
+	for _, inw := range normalizedWeights {
+		targetIdx += scale * inw.weight
+		for float64(idx) < targetIdx {
+			h := xxhash.Sum64String(inw.invoker.GetURL().String() + strconv.Itoa(len(items)))
+			items = append(items, &ringEntry{idx: idx, hash: h, invoker: inw.invoker})
+			idx++
+		}
+	}
+
+	// Sort items based on hash, to prepare for binary search.
+	sort.Slice(items, func(i, j int) bool { return items[i].hash < items[j].hash })
+	for i, ii := range items {
+		ii.idx = i
+	}
+	return items, nil
+}
+
+// normalizeWeights divides all the weights by the sum, so that the total weight
+// is 1.
+func normalizeWeights(invokers []invokerWrapper) ([]invokerWithWeight, float64, error) {
+	var weightSum int
+	for _, v := range invokers {
+		weightSum += v.weight
+	}
+	if weightSum == 0 {
+		return nil, 0, fmt.Errorf("total weight of all endpoints is 0")
+	}
+	weightSumF := float64(weightSum)
+	ret := make([]invokerWithWeight, 0, len(invokers))
+	min := math.MaxFloat64
+	for _, invoker := range invokers {
+		nw := float64(invoker.weight) / weightSumF
+		ret = append(ret, invokerWithWeight{invoker: invoker.invoker, weight: nw})
+		if nw < min {
+			min = nw
+		}
+	}
+	return ret, min, nil
+}
+
+func (lb *ringhashLoadBalance) pick(h uint64, items []*ringEntry) *ringEntry {
+	i := sort.Search(len(items), func(i int) bool { return items[i].hash >= h })
+	if i == len(items) {
+		// If not found, and h is greater than the largest hash, return the
+		// first item.
+		i = 0
+	}
+	return items[i]
+}
+
+func (lb *ringhashLoadBalance) generateHash(invocation protocol.Invocation, hashPolicies []*resource.HashPolicy) uint64 {
+	var (
+		hash          uint64
+		generatedHash bool
+	)
+	for _, policy := range hashPolicies {
+		var (
+			policyHash          uint64
+			generatedPolicyHash bool
+		)
+		switch policy.HashPolicyType {
+		case resource.HashPolicyTypeHeader:
+			value, ok := invocation.GetAttachment(policy.HeaderName)
+			if !ok || len(value) == 0 {
+				continue
+			}
+			policyHash = xxhash.Sum64String(value)
+			generatedHash = true
+			generatedPolicyHash = true
+		case resource.HashPolicyTypeChannelID:
+			// Hash the ClientConn pointer which logically uniquely
+			// identifies the client.
+			policyHash = xxhash.Sum64String(fmt.Sprintf("%p", &lb.client))
+			generatedHash = true
+			generatedPolicyHash = true
+		}
+
+		// Deterministically combine the hash policies. Rotating prevents
+		// duplicate hash policies from canceling each other out and preserves
+		// the 64 bits of entropy.
+		if generatedPolicyHash {
+			hash = bits.RotateLeft64(hash, 1)
+			hash = hash ^ policyHash
+		}
+
+		// If terminal policy and a hash has already been generated, ignore the
+		// rest of the policies and use that hash already generated.
+		if policy.Terminal && generatedHash {
+			break
+		}
+	}
+
+	if generatedHash {
+		return hash
+	}
+	// If no generated hash return a random long. In the grand scheme of things
+	// this logically will map to choosing a random backend to route request to.
+	return grpcrand.Uint64()
+}
diff --git a/cluster/loadbalance/ringhash/ringhash.go b/cluster/loadbalance/ringhash/ringhash.go
new file mode 100644
index 000000000..3bd5f085f
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ringhash.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+	"strconv"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"dubbo.apache.org/dubbo-go/v3/common/extension"
+	"dubbo.apache.org/dubbo-go/v3/common/logger"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/remoting/xds"
+)
+
+func init() {
+	extension.SetLoadbalance(constant.LoadXDSRingHash, newRingHashLoadBalance)
+}
+
+type invokerWrapper struct {
+	invoker protocol.Invoker
+	weight  int
+}
+
+type ringhashLoadBalance struct {
+	client xds.XDSWrapperClient
+}
+
+// newRingHashLoadBalance xds ring hash
+//
+// The same parameters of the request is always sent to the same provider.
+func newRingHashLoadBalance() loadbalance.LoadBalance {
+	return &ringhashLoadBalance{client: xds.GetXDSWrappedClient()}
+}
+
+// Select gets invoker based on load balancing strategy
+func (lb *ringhashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
+	url := invocation.Invoker().GetURL()
+	serviceUniqueKey := common.GetSubscribeName(url)
+	hostAddr, err := lb.client.GetHostAddrByServiceUniqueKey(serviceUniqueKey)
+	if err != nil {
+		logger.Errorf("[xds ringhash] GetHostAddrByServiceUniqueKey failed,error=%v", err)
+		return nil
+	}
+	clusterUpdate := lb.client.GetClusterUpdateIgnoreVersion(hostAddr)
+	policy := clusterUpdate.LBPolicy
+	if policy.MaximumRingSize < policy.MinimumRingSize {
+		logger.Errorf("[xds ringhash] ringsize parameter is invalid. MinimumRingSize=%d,MaximumRingSize=%d", policy.MaximumRingSize, policy.MaximumRingSize)
+		return nil
+	}
+	invokerWrappers := make([]invokerWrapper, 0, len(invokers))
+	for _, v := range invokers {
+		weight, _ := strconv.Atoi(v.GetURL().GetParam(constant.EndPointWeight, "1"))
+		invokerWrappers = append(invokerWrappers, invokerWrapper{invoker: v, weight: weight})
+	}
+	ring, err := lb.generateRing(invokerWrappers, policy.MinimumRingSize, policy.MaximumRingSize)
+	if err != nil {
+		logger.Errorf("[xds ringhash] ringsize parameter is invalid. MinimumRingSize=%d,MaximumRingSize=%d", policy.MaximumRingSize, policy.MaximumRingSize)
+		return nil
+	}
+
+	routerConfig := lb.client.GetRouterConfig(hostAddr)
+	router, err := lb.client.MatchRoute(routerConfig, invocation)
+	if err != nil {
+		logger.Errorf("[xds ringhash] not found route,method=%s", invocation.MethodName())
+		return nil
+	}
+	return lb.pick(lb.generateHash(invocation, router.HashPolicies), ring).invoker
+}
diff --git a/cluster/loadbalance/ringhash/ringhash_test.go b/cluster/loadbalance/ringhash/ringhash_test.go
new file mode 100644
index 000000000..9bef817be
--- /dev/null
+++ b/cluster/loadbalance/ringhash/ringhash_test.go
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package ringhash
+
+import (
+	"errors"
+	"fmt"
+	"testing"
+)
+
+import (
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+	"dubbo.apache.org/dubbo-go/v3/remoting/xds/mocks"
+	"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+)
+
+func TestSelect(t *testing.T) {
+	var invokers []protocol.Invoker
+	url, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+		"192.168.1.1", constant.DefaultPort))
+	url.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+	url.SetParam(constant.EndPointWeight, "3")
+	invokers = append(invokers, protocol.NewBaseInvoker(url))
+	url1, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+		"192.168.1.2", constant.DefaultPort))
+	url1.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+	url1.SetParam(constant.EndPointWeight, "3")
+	invokers = append(invokers, protocol.NewBaseInvoker(url1))
+	url2, _ := common.NewURL(fmt.Sprintf("dubbo://%s:%d/org.apache.demo.HelloService",
+		"192.168.1.3", constant.DefaultPort))
+	url2.SetParam(constant.InterfaceKey, "org.apache.demo.HelloService")
+	url2.SetParam(constant.EndPointWeight, "4")
+	invokers = append(invokers, protocol.NewBaseInvoker(url2))
+	t.Run("normal", func(t *testing.T) {
+		mockXDSClient := &mocks.WrappedClientMock{}
+		mockXDSClient.On("GetRouterConfig", mock.Anything).Return(resource.RouteConfigUpdate{})
+		mockXDSClient.On("GetClusterUpdateIgnoreVersion", mock.Anything).
+			Return(resource.ClusterUpdate{LBPolicy: &resource.ClusterLBPolicyRingHash{MinimumRingSize: 9, MaximumRingSize: 10}})
+		mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).Return("HelloService", nil)
+		mockXDSClient.On("MatchRoute", mock.Anything, mock.Anything).
+			Return(&resource.Route{HashPolicies: []*resource.HashPolicy{{HashPolicyType: resource.HashPolicyTypeHeader,
+				HeaderName: "key"}}}, nil)
+		lb := &ringhashLoadBalance{client: mockXDSClient}
+		inv := &invocation.RPCInvocation{}
+		inv.SetInvoker(protocol.NewBaseInvoker(url))
+		inv.SetAttachment("key", "1")
+		assert.NotNil(t, lb.Select(invokers, inv))
+	})
+
+	t.Run("normal_hash_channelID", func(t *testing.T) {
+		mockXDSClient := &mocks.WrappedClientMock{}
+		mockXDSClient.On("GetRouterConfig", mock.Anything).Return(resource.RouteConfigUpdate{})
+		mockXDSClient.On("GetClusterUpdateIgnoreVersion", mock.Anything).
+			Return(resource.ClusterUpdate{LBPolicy: &resource.ClusterLBPolicyRingHash{MinimumRingSize: 9, MaximumRingSize: 10}})
+		mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).Return("HelloService", nil)
+		mockXDSClient.On("MatchRoute", mock.Anything, mock.Anything).
+			Return(&resource.Route{HashPolicies: []*resource.HashPolicy{
+				{HashPolicyType: resource.HashPolicyTypeChannelID}}}, nil)
+		lb := &ringhashLoadBalance{client: mockXDSClient}
+		inv := &invocation.RPCInvocation{}
+		inv.SetInvoker(protocol.NewBaseInvoker(url))
+		assert.NotNil(t, lb.Select(invokers, inv))
+	})
+
+	t.Run("GetHostAddrByServiceUniqueKey_faild", func(t *testing.T) {
+		mockXDSClient := &mocks.WrappedClientMock{}
+		mockXDSClient.On("GetHostAddrByServiceUniqueKey", mock.Anything).
+			Return("HelloService", errors.New("error"))
+		lb := &ringhashLoadBalance{client: mockXDSClient}
+		inv := &invocation.RPCInvocation{}
+		inv.SetInvoker(protocol.NewBaseInvoker(url))
+		inv.SetAttachment("key", "1")
+		assert.Nil(t, lb.Select(invokers, inv))
+	})
+}
diff --git a/cluster/router/meshrouter/meshrouter.go b/cluster/router/meshrouter/meshrouter.go
index 102a48ac7..567f58a14 100644
--- a/cluster/router/meshrouter/meshrouter.go
+++ b/cluster/router/meshrouter/meshrouter.go
@@ -18,9 +18,7 @@
 package meshrouter
 
 import (
-	"bytes"
 	"math/rand"
-	"strings"
 )
 
 import (
@@ -31,8 +29,6 @@ import (
 	"dubbo.apache.org/dubbo-go/v3/config_center"
 	"dubbo.apache.org/dubbo-go/v3/protocol"
 	"dubbo.apache.org/dubbo-go/v3/remoting/xds"
-	"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
-	"dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
 )
 
 const (
@@ -60,7 +56,7 @@ func (r *MeshRouter) Route(invokers []protocol.Invoker, url *common.URL, invocat
 	if r.client == nil {
 		return invokers
 	}
-	hostAddr, err := r.client.GetHostAddrByServiceUniqueKey(getSubscribeName(url))
+	hostAddr, err := r.client.GetHostAddrByServiceUniqueKey(common.GetSubscribeName(url))
 	if err != nil {
 		// todo deal with error
 		return nil
@@ -75,47 +71,28 @@ func (r *MeshRouter) Route(invokers []protocol.Invoker, url *common.URL, invocat
 		}
 		clusterInvokerMap[meshClusterID] = append(clusterInvokerMap[meshClusterID], v)
 	}
+	route, err := r.client.MatchRoute(rconf, invocation)
+	if err != nil {
+		logger.Errorf("[Mesh Router] not found route,method=%s", invocation.MethodName())
+		return nil
+	}
 
-	if len(rconf.VirtualHosts) != 0 {
-		// try to route to sub virtual host
-		for _, vh := range rconf.VirtualHosts {
-			// 1. match domain
-			//vh.Domains == ["*"]
-
-			// 2. match http route
-			for _, r := range vh.Routes {
-				//route.
-				ctx := invocation.GetAttachmentAsContext()
-				matcher, err := resource.RouteToMatcher(r)
-				if err != nil {
-					logger.Errorf("[Mesh Router] router to matcher failed with error %s", err)
-					return invokers
-				}
-				if matcher.Match(resolver.RPCInfo{
-					Context: ctx,
-					Method:  "/" + invocation.MethodName(),
-				}) {
-					// Loop through routes in order and select first match.
-					if r == nil || r.WeightedClusters == nil {
-						logger.Errorf("[Mesh Router] route's WeightedClusters is empty, route: %+v", r)
-						return invokers
-					}
-					invokersWeightPairs := make(invokerWeightPairs, 0)
-
-					for clusterID, weight := range r.WeightedClusters {
-						// cluster -> invokers
-						targetInvokers := clusterInvokerMap[clusterID]
-						invokersWeightPairs = append(invokersWeightPairs, invokerWeightPair{
-							invokers: targetInvokers,
-							weight:   weight.Weight,
-						})
-					}
-					return invokersWeightPairs.GetInvokers()
-				}
-			}
-		}
+	// Loop through routes in order and select first match.
+	if route == nil || route.WeightedClusters == nil {
+		logger.Errorf("[Mesh Router] route's WeightedClusters is empty, route: %+v", r)
+		return invokers
+	}
+	invokersWeightPairs := make(invokerWeightPairs, 0)
+
+	for clusterID, weight := range route.WeightedClusters {
+		// cluster -> invokers
+		targetInvokers := clusterInvokerMap[clusterID]
+		invokersWeightPairs = append(invokersWeightPairs, invokerWeightPair{
+			invokers: targetInvokers,
+			weight:   weight.Weight,
+		})
 	}
-	return invokers
+	return invokersWeightPairs.GetInvokers()
 }
 
 // Process there is no process needs for uniform Router, as it upper struct RouterChain has done it
@@ -167,21 +144,3 @@ func (i *invokerWeightPairs) GetInvokers() []protocol.Invoker {
 	}
 	return (*i)[0].invokers
 }
-
-func getSubscribeName(url *common.URL) string {
-	var buffer bytes.Buffer
-
-	buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
-	appendParam(&buffer, url, constant.InterfaceKey)
-	appendParam(&buffer, url, constant.VersionKey)
-	appendParam(&buffer, url, constant.GroupKey)
-	return buffer.String()
-}
-
-func appendParam(target *bytes.Buffer, url *common.URL, key string) {
-	value := url.GetParam(key, "")
-	target.Write([]byte(constant.NacosServiceNameSeparator))
-	if strings.TrimSpace(value) != "" {
-		target.Write([]byte(value))
-	}
-}
diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go
index 82c5771f9..dde844337 100644
--- a/common/constant/loadbalance.go
+++ b/common/constant/loadbalance.go
@@ -23,4 +23,5 @@ const (
 	LoadBalanceKeyRandom            = "random"
 	LoadBalanceKeyRoundRobin        = "roundrobin"
 	LoadBalanceKeyP2C               = "p2c"
+	LoadXDSRingHash                 = "xdsringhash"
 )
diff --git a/common/constant/xds.go b/common/constant/xds.go
index 5c92d0ab4..62903b874 100644
--- a/common/constant/xds.go
+++ b/common/constant/xds.go
@@ -17,6 +17,10 @@
 
 package constant
 
+const (
+	EndPointWeight = "endPointWeight"
+)
+
 const (
 	MeshClusterIDKey = "meshClusterID"
 	MeshHostAddrKey  = "meshHostAddr"
diff --git a/remoting/xds/client.go b/remoting/xds/client.go
index 448744eb7..7c23f5743 100644
--- a/remoting/xds/client.go
+++ b/remoting/xds/client.go
@@ -18,6 +18,7 @@
 package xds
 
 import (
+	"errors"
 	"sync"
 	"time"
 )
@@ -28,12 +29,14 @@ import (
 
 import (
 	"dubbo.apache.org/dubbo-go/v3/common/constant"
+	"dubbo.apache.org/dubbo-go/v3/protocol"
 	"dubbo.apache.org/dubbo-go/v3/registry"
 	xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
 	"dubbo.apache.org/dubbo-go/v3/remoting/xds/ewatcher"
 	"dubbo.apache.org/dubbo-go/v3/remoting/xds/mapping"
 	"dubbo.apache.org/dubbo-go/v3/xds/client"
 	"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+	"dubbo.apache.org/dubbo-go/v3/xds/utils/resolver"
 )
 
 const (
@@ -498,6 +501,28 @@ func (w *WrappedClientImpl) getAllVersionClusterName(hostAddr string) []string {
 	return allVersionClusterNames
 }
 
+func (w *WrappedClientImpl) MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error) {
+	ctx := invocation.GetAttachmentAsContext()
+	rpcInfo := resolver.RPCInfo{
+		Context: ctx,
+		Method:  "/" + invocation.MethodName(),
+	}
+	// try to route to sub virtual host
+	for _, vh := range routerConfig.VirtualHosts {
+		for _, r := range vh.Routes {
+			//route.
+			matcher, err := resource.RouteToMatcher(r)
+			if err != nil {
+				return nil, err
+			}
+			if matcher.Match(rpcInfo) {
+				return r, nil
+			}
+		}
+	}
+	return nil, errors.New("not found route")
+}
+
 type XDSWrapperClient interface {
 	Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error
 	UnSubscribe(svcUniqueName string)
@@ -507,4 +532,5 @@ type XDSWrapperClient interface {
 	GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate
 	GetHostAddress() xdsCommon.HostAddr
 	GetIstioPodIP() string
+	MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error)
 }
diff --git a/remoting/xds/ewatcher/ewatcher.go b/remoting/xds/ewatcher/ewatcher.go
index 06a2a4f20..304c7daae 100644
--- a/remoting/xds/ewatcher/ewatcher.go
+++ b/remoting/xds/ewatcher/ewatcher.go
@@ -19,6 +19,7 @@ package ewatcher
 
 import (
 	"fmt"
+	"strconv"
 	"sync"
 )
 
@@ -112,6 +113,7 @@ func generateRegistryEvent(clusterID string, endpoint resource.Endpoint, interfa
 	url.AddParam(constant.MeshSubsetKey, cluster.Subset)
 	url.AddParam(constant.MeshClusterIDKey, clusterID)
 	url.AddParam(constant.MeshHostAddrKey, cluster.Addr.String())
+	url.AddParam(constant.EndPointWeight, strconv.Itoa(int(endpoint.Weight)))
 	if endpoint.HealthStatus == resource.EndpointHealthStatusUnhealthy {
 		return &registry.ServiceEvent{
 			Action:  remoting.EventTypeDel,
diff --git a/remoting/xds/mocks/client.go b/remoting/xds/mocks/client.go
new file mode 100644
index 000000000..02c7f93b9
--- /dev/null
+++ b/remoting/xds/mocks/client.go
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+	"github.com/stretchr/testify/mock"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/registry"
+	xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
+	"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
+)
+
+// WrappedClientMock is an autogenerated mock type for the WrappedClientMock type
+type WrappedClientMock struct {
+	mock.Mock
+}
+
+func (m *WrappedClientMock) Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error {
+	args := m.Called(svcUniqueName, interfaceName, hostAddr, lst)
+	return args.Error(0)
+}
+
+func (m *WrappedClientMock) UnSubscribe(svcUniqueName string) {
+
+}
+
+func (m *WrappedClientMock) GetRouterConfig(hostAddr string) resource.RouteConfigUpdate {
+	args := m.Called(hostAddr)
+	return args.Get(0).(resource.RouteConfigUpdate)
+}
+func (m *WrappedClientMock) GetHostAddrByServiceUniqueKey(serviceUniqueKey string) (string, error) {
+	args := m.Called(serviceUniqueKey)
+	return args.String(0), args.Error(1)
+}
+func (m *WrappedClientMock) ChangeInterfaceMap(serviceUniqueKey string, add bool) error {
+	args := m.Called(serviceUniqueKey, add)
+	return args.Error(0)
+}
+
+func (m *WrappedClientMock) GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate {
+	args := m.Called(hostAddr)
+	return args.Get(0).(resource.ClusterUpdate)
+}
+
+func (m *WrappedClientMock) GetHostAddress() xdsCommon.HostAddr {
+	args := m.Called()
+	return args.Get(0).(xdsCommon.HostAddr)
+}
+
+func (m *WrappedClientMock) GetIstioPodIP() string {
+	args := m.Called()
+	return args.String(0)
+}
+
+func (m *WrappedClientMock) MatchRoute(routerConfig resource.RouteConfigUpdate, invocation protocol.Invocation) (*resource.Route, error) {
+	args := m.Called(routerConfig, invocation)
+	return args.Get(0).(*resource.Route), args.Error(1)
+}