You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ya...@apache.org on 2023/10/07 15:20:20 UTC

[dubbo-rust] branch feat/cluster updated: Feat/cluster Optimized the Router module (#160)

This is an automated email from the ASF dual-hosted git repository.

yangyang pushed a commit to branch feat/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git


The following commit(s) were added to refs/heads/feat/cluster by this push:
     new a957c54  Feat/cluster Optimized the Router module (#160)
a957c54 is described below

commit a957c54d1d58a02a4e60b95b1f49f42e8cb4e992
Author: Urara <95...@users.noreply.github.com>
AuthorDate: Sat Oct 7 23:20:15 2023 +0800

    Feat/cluster Optimized the Router module (#160)
    
    * perf: Optimized the logic of the routing module.
    
    Refactored route logic decision-making, eliminating unnecessary cloning
    and improving performance.
    
    * perf: Optimized the logic of the routing module.
    
    Refactored route logic decision-making, eliminating unnecessary cloning
    and improving performance.
    
    * perf: Removed unnecessary configurations.
    
    * perf: Removed unnecessary configurations.
    
    * perf: Optimized the Router module
    
    Optimized the Router module
    Added Router Chain to MockDirectory
---
 application.yaml                                   |  32 +---
 config/src/router.rs                               |   6 +-
 dubbo/src/cluster/directory.rs                     |   9 +-
 dubbo/src/cluster/mod.rs                           |  67 ++++----
 .../cluster/router/condition/condition_router.rs   |  13 +-
 dubbo/src/cluster/router/condition/matcher.rs      |  80 ++++-----
 .../src/cluster/router/condition/single_router.rs  |  34 ++--
 .../cluster/router/manager/condition_manager.rs    |  73 ++++-----
 dubbo/src/cluster/router/manager/router_manager.rs | 137 ++++++++--------
 dubbo/src/cluster/router/manager/tag_manager.rs    |  21 +--
 dubbo/src/cluster/router/mod.rs                    |   8 +-
 .../router/nacos_config_center/nacos_client.rs     | 138 ++++++++--------
 dubbo/src/cluster/router/router_chain.rs           |  72 ++++-----
 dubbo/src/cluster/router/tag/tag_router.rs         |  32 +++-
 dubbo/src/invocation.rs                            |   4 +-
 dubbo/src/protocol/mod.rs                          |   3 +-
 dubbo/src/protocol/triple/triple_invoker.rs        |   5 +-
 dubbo/src/triple/client/builder.rs                 |   6 +-
 dubbo/src/triple/client/mod.rs                     |   2 +-
 dubbo/src/triple/client/replay.rs                  |   2 +-
 dubbo/src/triple/client/triple.rs                  |  14 +-
 examples/echo/src/generated/grpc.examples.echo.rs  | 179 +++++++--------------
 22 files changed, 396 insertions(+), 541 deletions(-)

diff --git a/application.yaml b/application.yaml
index 902b0ef..bec29a6 100644
--- a/application.yaml
+++ b/application.yaml
@@ -25,33 +25,5 @@ dubbo:
   routers:
     consumer:
       - service: "org.apache.dubbo.sample.tri.Greeter"
-        url: triple://localhost:20000
-        protocol: triple
-    nacos:
-      addr: "127.0.0.1:8848"
-      namespace: ""
-      app: ""
-    conditions:
-      - scope: "service"
-        force: false
-        runtime: true
-        enabled: true
-        key: "org.apache.dubbo.sample.tri.Greeter"
-        conditions:
-          - method=greet => port=8889
-      - scope: "service"
-        force: true
-        runtime: true
-        enabled: true
-        key: "user.UserService"
-        conditions:
-          - method=get_s => port=2003
-    tags:
-      force: true
-      enabled: true
-      key: shop-detail
-      tags:
-        - name: gray
-          matches:
-            - key: env
-              value: gray
\ No newline at end of file
+        url: tri://127.0.0.1:20000
+        protocol: triple
\ No newline at end of file
diff --git a/config/src/router.rs b/config/src/router.rs
index 98aa14a..b45bd47 100644
--- a/config/src/router.rs
+++ b/config/src/router.rs
@@ -2,9 +2,10 @@ use serde::{Deserialize, Serialize};
 
 #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
 pub struct ConditionRouterConfig {
+    #[serde(rename = "configVersion")]
+    pub config_version: String,
     pub scope: String,
     pub force: bool,
-    pub runtime: bool,
     pub enabled: bool,
     pub key: String,
     pub conditions: Vec<String>,
@@ -12,6 +13,8 @@ pub struct ConditionRouterConfig {
 
 #[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
 pub struct TagRouterConfig {
+    #[serde(rename = "configVersion")]
+    pub config_version: String,
     pub force: bool,
     pub enabled: bool,
     pub key: String,
@@ -28,6 +31,7 @@ pub struct ConsumerConfig {
 #[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
 pub struct Tag {
     pub name: String,
+    #[serde(rename = "match")]
     pub matches: Vec<TagMatchRule>,
 }
 
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 547878b..66a3c43 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -39,7 +39,7 @@ use tower::{
     ready_cache::ReadyCache,
 };
 
-use crate::cluster::Directory;
+use crate::{cluster::Directory, codegen::RpcInvocation, invocation::Invocation};
 
 /// Directory.
 ///
@@ -68,12 +68,12 @@ impl StaticDirectory {
 }
 
 impl Directory for StaticDirectory {
-    fn list(&self, service_name: String) -> Vec<BoxInvoker> {
+    fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let url = Url::from_url(&format!(
             "tri://{}:{}/{}",
             self.uri.host().unwrap(),
             self.uri.port().unwrap(),
-            service_name,
+            inv.get_target_service_unique_name(),
         ))
         .unwrap();
         let invoker = Box::new(TripleInvoker::new(url));
@@ -97,7 +97,8 @@ impl RegistryDirectory {
 }
 
 impl Directory for RegistryDirectory {
-    fn list(&self, service_name: String) -> Vec<BoxInvoker> {
+    fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
+        let service_name = inv.get_target_service_unique_name();
         let url = Url::from_url(&format!(
             "triple://{}:{}/{}",
             "127.0.0.1", "8888", service_name
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 7411a67..964150d 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -23,6 +23,9 @@ use tower::{ready_cache::ReadyCache, ServiceExt};
 use tower_service::Service;
 
 use crate::{
+    cluster::router::{
+        manager::router_manager::get_global_router_manager, router_chain::RouterChain,
+    },
     codegen::RpcInvocation,
     invocation::Invocation,
     protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
@@ -35,7 +38,7 @@ pub mod loadbalance;
 pub mod router;
 
 pub trait Directory: Debug {
-    fn list(&self, service_name: String) -> Vec<BoxInvoker>;
+    fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
 }
 
 type BoxDirectory = Box<dyn Directory + Send + Sync>;
@@ -134,7 +137,7 @@ impl Service<http::Request<ClonedBody>> for FailoverCluster {
         let inv = inv.unwrap();
         let service_name = inv.get_target_service_unique_name();
 
-        let invokers = self.dir.list(service_name.clone());
+        let invokers = self.dir.list(Arc::new(inv.clone()));
 
         Box::pin(async move {
             let mut current_req = req;
@@ -172,55 +175,36 @@ impl Invoker<http::Request<ClonedBody>> for FailoverCluster {
 
 #[derive(Debug, Default)]
 pub struct MockDirectory {
-    // router_chain: RouterChain,
+    router_chain: RouterChain,
 }
 
 impl MockDirectory {
-    pub fn new() -> MockDirectory {
-        // let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation);
-        Self {
-            // router_chain
-        }
+    pub fn new(service_name: String) -> MockDirectory {
+        let router_chain = get_global_router_manager()
+            .read()
+            .unwrap()
+            .get_router_chain(service_name);
+        Self { router_chain }
     }
 }
 
 impl Directory for MockDirectory {
-    fn list(&self, service_name: String) -> Vec<BoxInvoker> {
-        // tracing::info!("MockDirectory: {}", meta);
+    fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
         let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
-        vec![Box::new(TripleInvoker::new(u))]
-        // self.router_chain.route(u, invo);
+        let mut urls = vec![u];
+        // tracing::info!("MockDirectory: {}", meta);
+        urls = self.router_chain.route(urls, inv);
+        let mut result = Vec::new();
+        for url in urls {
+            result.push(Box::new(TripleInvoker::new(url)) as BoxInvoker);
+        }
+        result
     }
 }
 
-
-// #[derive(Debug, Default)]
-// pub struct RouterChain {
-//     router: HashMap<String, BoxRouter>,
-//     invokers: Arc<Vec<BoxInvoker>>,
-// }
-
-// impl RouterChain {
-//     pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> Arc<Vec<BoxInvoker>> {
-//         let r = self.router.get("mock").unwrap();
-//         r.route(self.invokers.clone(), url, invo)
-//     }
-// }
-
-// pub trait Router: Debug {
-//     fn route(
-//         &self,
-//         invokers: Arc<Vec<BoxInvoker>>,
-//         url: Url,
-//         invo: Arc<RpcInvocation>,
-//     ) -> Arc<Vec<BoxInvoker>>;
-// }
-
-// pub type BoxRouter = Box<dyn Router + Sync + Send>;
-
 #[cfg(test)]
 pub mod tests {
-    use std::task::Poll;
+    use std::{sync::Arc, task::Poll};
 
     use bytes::{Buf, BufMut, BytesMut};
     use dubbo_base::Url;
@@ -250,8 +234,11 @@ pub mod tests {
     struct MockDirectory;
 
     impl Directory for MockDirectory {
-        fn list(&self, service_name: String) -> Vec<crate::protocol::BoxInvoker> {
-            println!("get invoker list for {}", service_name);
+        fn list(&self, inv: Arc<RpcInvocation>) -> Vec<crate::protocol::BoxInvoker> {
+            println!(
+                "get invoker list for {}",
+                inv.get_target_service_unique_name()
+            );
 
             vec![
                 Box::new(MockInvoker(1)),
diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs
index f39cd1c..73aca00 100644
--- a/dubbo/src/cluster/router/condition/condition_router.rs
+++ b/dubbo/src/cluster/router/condition/condition_router.rs
@@ -17,19 +17,18 @@ pub struct ConditionRouter {
 }
 
 impl Router for ConditionRouter {
-    fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
-        let mut invokers_result = invokers.clone();
-        if let Some(routers) = self.application_routers.clone() {
+    fn route(&self, mut invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
+        if let Some(routers) = &self.application_routers {
             for router in &routers.read().unwrap().routers {
-                invokers_result = router.route(invokers_result, url.clone(), invo.clone())
+                invokers = router.route(invokers, url.clone(), invo.clone());
             }
         }
-        if let Some(routers) = self.service_routers.clone() {
+        if let Some(routers) = &self.service_routers {
             for router in &routers.read().unwrap().routers {
-                invokers_result = router.route(invokers_result, url.clone(), invo.clone())
+                invokers = router.route(invokers, url.clone(), invo.clone());
             }
         }
-        invokers_result
+        invokers
     }
 }
 
diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs
index 8c9177e..92bbe2d 100644
--- a/dubbo/src/cluster/router/condition/matcher.rs
+++ b/dubbo/src/cluster/router/condition/matcher.rs
@@ -1,6 +1,5 @@
-use crate::codegen::RpcInvocation;
 use regex::Regex;
-use std::{collections::HashSet, error::Error, option::Option, sync::Arc};
+use std::{collections::HashSet, error::Error, option::Option};
 
 #[derive(Clone, Debug, Default)]
 pub struct ConditionMatcher {
@@ -18,50 +17,25 @@ impl ConditionMatcher {
         }
     }
 
-    pub fn is_match(
-        &self,
-        value: Option<String>,
-        invocation: Arc<RpcInvocation>,
-        _is_when: bool,
-    ) -> Result<bool, Box<dyn Error>> {
+    pub fn is_match(&self, value: Option<String>) -> Result<bool, Box<dyn Error>> {
         match value {
-            None => {
-                // if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false.
-                Ok(false)
-            }
+            None => Ok(false),
             Some(val) => {
-                if !self.matches.is_empty() && self.mismatches.is_empty() {
-                    for match_ in self.matches.iter() {
-                        if self.do_pattern_match(match_, &val, invocation.clone())? {
-                            return Ok(true);
-                        }
-                    }
-                    Ok(false)
-                } else if !self.mismatches.is_empty() && self.matches.is_empty() {
-                    for mismatch in self.mismatches.iter() {
-                        if self.do_pattern_match(mismatch, &val, invocation.clone())? {
-                            return Ok(false);
-                        }
+                for match_ in self.matches.iter() {
+                    if self.do_pattern_match(match_, &val) {
+                        return Ok(true);
                     }
-                    Ok(true)
-                } else if !self.matches.is_empty() && !self.mismatches.is_empty() {
-                    for mismatch in self.mismatches.iter() {
-                        if self.do_pattern_match(mismatch, &val, invocation.clone())? {
-                            return Ok(false);
-                        }
-                    }
-                    for match_ in self.matches.iter() {
-                        if self.do_pattern_match(match_, &val, invocation.clone())? {
-                            return Ok(true);
-                        }
+                }
+                for mismatch in self.mismatches.iter() {
+                    if !self.do_pattern_match(mismatch, &val) {
+                        return Ok(true);
                     }
-                    Ok(false)
-                } else {
-                    Ok(false)
                 }
+                Ok(false)
             }
         }
     }
+
     pub fn get_matches(&mut self) -> &mut HashSet<String> {
         &mut self.matches
     }
@@ -69,16 +43,26 @@ impl ConditionMatcher {
         &mut self.mismatches
     }
 
-    fn do_pattern_match(
-        &self,
-        pattern: &String,
-        value: &String,
-        _invocation: Arc<RpcInvocation>,
-    ) -> Result<bool, Box<dyn Error>> {
-        if pattern.contains("*") {
-            return Ok(star_matcher(pattern, value));
+    fn do_pattern_match(&self, pattern: &str, value: &str) -> bool {
+        if pattern.contains('*') {
+            return star_matcher(pattern, value);
+        }
+
+        if pattern.contains('~') {
+            let parts: Vec<&str> = pattern.split('~').collect();
+
+            if parts.len() == 2 {
+                if let (Ok(left), Ok(right), Ok(val)) = (
+                    parts[0].parse::<i32>(),
+                    parts[1].parse::<i32>(),
+                    value.parse::<i32>(),
+                ) {
+                    return range_matcher(val, left, right);
+                }
+            }
+            return false;
         }
-        Ok(pattern.eq(value))
+        pattern == value
     }
 }
 
@@ -89,6 +73,6 @@ pub fn star_matcher(pattern: &str, input: &str) -> bool {
     regex.is_match(input)
 }
 
-pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool {
+pub fn range_matcher(val: i32, min: i32, max: i32) -> bool {
     min <= val && val <= max
 }
diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs
index e290b15..5f06aa8 100644
--- a/dubbo/src/cluster/router/condition/single_router.rs
+++ b/dubbo/src/cluster/router/condition/single_router.rs
@@ -169,20 +169,16 @@ impl ConditionSingleRouter {
 
     pub fn match_when(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
         if self.when_condition.is_empty() {
-            true
-        } else {
-            false
-        };
-        self.do_match(url, &self.when_condition, invocation, true)
+            return true;
+        }
+        self.do_match(url, &self.when_condition, invocation)
     }
 
     pub fn match_then(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
-        if self.when_condition.is_empty() {
-            true
-        } else {
-            false
-        };
-        self.do_match(url, &self.then_condition, invocation, false)
+        if self.then_condition.is_empty() {
+            return false;
+        }
+        self.do_match(url, &self.then_condition, invocation)
     }
 
     pub fn do_match(
@@ -190,25 +186,19 @@ impl ConditionSingleRouter {
         url: Url,
         conditions: &HashMap<String, Arc<RwLock<ConditionMatcher>>>,
         invocation: Arc<RpcInvocation>,
-        is_when: bool,
     ) -> bool {
         let sample: HashMap<String, String> = to_original_map(url);
-        for (key, condition_matcher) in conditions {
+        conditions.iter().all(|(key, condition_matcher)| {
             let matcher = condition_matcher.read().unwrap();
             let value = get_value(key, &sample, invocation.clone());
-            match matcher.is_match(value, invocation.clone(), is_when) {
-                Ok(result) => {
-                    if !result {
-                        return false;
-                    }
-                }
+            match matcher.is_match(value) {
+                Ok(result) => result,
                 Err(error) => {
                     info!("Error occurred: {:?}", error);
-                    return false;
+                    false
                 }
             }
-        }
-        true
+        })
     }
 }
 
diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs
index 4207b79..7ad5e1b 100644
--- a/dubbo/src/cluster/router/manager/condition_manager.rs
+++ b/dubbo/src/cluster/router/manager/condition_manager.rs
@@ -16,28 +16,30 @@ pub struct ConditionRouterManager {
 }
 
 impl ConditionRouterManager {
-    pub fn get_router(&self, service_name: String) -> Option<ConditionRouter> {
-        let routers_service = self.routers_service.get(&service_name);
-        match routers_service {
-            Some(routers_service) => {
-                if self.routers_application.read().unwrap().is_null() {
-                    return Some(ConditionRouter::new(Some(routers_service.clone()), None));
-                }
-                Some(ConditionRouter::new(
+    pub fn get_router(&self, service_name: &String) -> Option<ConditionRouter> {
+        let routers_application_is_null = self.routers_application.read().unwrap().is_null();
+        self.routers_service
+            .get(service_name)
+            .map(|routers_service| {
+                ConditionRouter::new(
                     Some(routers_service.clone()),
-                    Some(self.routers_application.clone()),
-                ))
-            }
-            None => {
-                if self.routers_application.read().unwrap().is_null() {
-                    return None;
+                    if routers_application_is_null {
+                        None
+                    } else {
+                        Some(self.routers_application.clone())
+                    },
+                )
+            })
+            .or_else(|| {
+                if routers_application_is_null {
+                    None
+                } else {
+                    Some(ConditionRouter::new(
+                        None,
+                        Some(self.routers_application.clone()),
+                    ))
                 }
-                Some(ConditionRouter::new(
-                    None,
-                    Some(self.routers_application.clone()),
-                ))
-            }
-        }
+            })
     }
 
     pub fn update(&mut self, config: ConditionRouterConfig) {
@@ -45,31 +47,26 @@ impl ConditionRouterManager {
         let scope = config.scope;
         let key = config.key;
         let enable = config.enabled;
-        let mut routers = Vec::new();
-        for condition in config.conditions {
-            routers.push(ConditionSingleRouter::new(condition, force, enable));
-        }
+
+        let routers = config
+            .conditions
+            .into_iter()
+            .map(|condition| ConditionSingleRouter::new(condition, force, enable))
+            .collect::<Vec<_>>();
+
         match scope.as_str() {
             "application" => {
                 self.routers_application.write().unwrap().routers = routers;
             }
             "service" => {
-                if let Some(x) = self.routers_service.get(&key) {
-                    x.write().unwrap().routers = routers
-                } else {
-                    self.routers_service.insert(
-                        key,
-                        Arc::new(RwLock::new(ConditionSingleRouters::new(routers))),
-                    );
-                }
+                self.routers_service
+                    .entry(key)
+                    .or_insert_with(|| Arc::new(RwLock::new(ConditionSingleRouters::new(vec![]))))
+                    .write()
+                    .unwrap()
+                    .routers = routers;
             }
             _ => {}
         }
     }
-
-    pub fn _parse_rules(&mut self, configs: Vec<ConditionRouterConfig>) {
-        for config in configs {
-            self.update(config)
-        }
-    }
 }
diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs
index a2a6658..e963181 100644
--- a/dubbo/src/cluster/router/manager/router_manager.rs
+++ b/dubbo/src/cluster/router/manager/router_manager.rs
@@ -1,10 +1,7 @@
-use crate::{
-    cluster::router::{
-        manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
-        nacos_config_center::nacos_client::NacosClient,
-        router_chain::RouterChain,
-    },
-    invocation::{Invocation, RpcInvocation},
+use crate::cluster::router::{
+    manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
+    nacos_config_center::nacos_client::NacosClient,
+    router_chain::RouterChain,
 };
 use dubbo_base::Url;
 use dubbo_config::{
@@ -19,7 +16,8 @@ use std::{
 };
 
 pub static GLOBAL_ROUTER_MANAGER: OnceCell<Arc<RwLock<RouterManager>>> = OnceCell::new();
-
+const TAG: &str = "tag";
+const CONDITION: &str = "condition";
 pub struct RouterManager {
     pub condition_router_manager: ConditionRouterManager,
     pub tag_router_manager: TagRouterManager,
@@ -28,30 +26,28 @@ pub struct RouterManager {
 }
 
 impl RouterManager {
-    pub fn get_router_chain(&self, invocation: Arc<RpcInvocation>) -> RouterChain {
-        let service = invocation.get_target_service_unique_name().clone();
-        let condition_router = self.condition_router_manager.get_router(service.clone());
-        let tag_router = self.tag_router_manager.get_router();
+    pub fn get_router_chain(&self, service: String) -> RouterChain {
         let mut chain = RouterChain::new();
-        match self.consumer.get(service.as_str()) {
-            None => {}
-            Some(url) => {
-                chain.set_condition_router(condition_router);
-                chain.set_tag_router(tag_router);
-                chain.self_url = url.clone();
+        if let Some(url) = self.consumer.get(service.as_str()) {
+            if let Some(tag_router) = self.tag_router_manager.get_router(&service) {
+                chain.add_router(TAG.to_string(), Box::new(tag_router));
+            }
+            if let Some(condition_router) = self.condition_router_manager.get_router(&service) {
+                chain.add_router(CONDITION.to_string(), Box::new(condition_router));
             }
+            chain.self_url = url.clone();
         }
         chain
     }
 
     pub fn notify(&mut self, event: RouterConfigChangeEvent) {
         match event.router_kind.as_str() {
-            "condition" => {
+            CONDITION => {
                 let config: ConditionRouterConfig =
                     serde_yaml::from_str(event.content.as_str()).unwrap();
                 self.condition_router_manager.update(config)
             }
-            "tag" => {
+            TAG => {
                 let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap();
                 self.tag_router_manager.update(config)
             }
@@ -66,76 +62,71 @@ impl RouterManager {
         self.init_router_managers_for_nacos();
     }
 
-    pub fn init_router_managers_for_nacos(&mut self) {
-        let config = self
+    fn init_router_managers_for_nacos(&mut self) {
+        if let Some(tag_config) = self
             .nacos
             .as_ref()
-            .unwrap()
-            .get_tag_config("application".to_string());
-        match config {
-            None => {}
-            Some(tag_config) => {
-                self.tag_router_manager.init();
-                self.tag_router_manager.update(tag_config)
-            }
+            .and_then(|n| n.get_config("application", TAG, TAG))
+        {
+            self.tag_router_manager.update(tag_config);
         }
+
+        if let Some(condition_app_config) = self
+            .nacos
+            .as_ref()
+            .and_then(|n| n.get_config("application", CONDITION, TAG))
+        {
+            self.condition_router_manager.update(condition_app_config);
+        }
+
         for (service_name, _) in &self.consumer {
-            let config = self
+            if let Some(condition_config) = self
                 .nacos
                 .as_ref()
-                .unwrap()
-                .get_condition_config(service_name.clone());
-            match config {
-                None => {}
-                Some(condition_config) => self.condition_router_manager.update(condition_config),
+                .and_then(|n| n.get_config(service_name, CONDITION, CONDITION))
+            {
+                self.condition_router_manager.update(condition_config);
             }
         }
     }
 
     pub fn init(&mut self) {
         let config = get_global_config().routers.clone();
+        self.init_consumer_configs();
+        if let Some(nacos_config) = &config.nacos {
+            self.init_nacos(nacos_config.clone());
+        } else {
+            trace!("Nacos not configured, using local YAML configuration for routing");
+            if let Some(condition_configs) = &config.conditions {
+                for condition_config in condition_configs {
+                    self.condition_router_manager
+                        .update(condition_config.clone());
+                }
+            } else {
+                info!("Unconfigured Condition Router")
+            }
+            if let Some(tag_config) = &config.tags {
+                self.tag_router_manager.update(tag_config.clone());
+            } else {
+                info!("Unconfigured Tag Router")
+            }
+        }
+    }
+
+    fn init_consumer_configs(&mut self) {
         let consumer_configs = get_global_config()
             .routers
             .consumer
             .clone()
-            .unwrap_or(Vec::new());
+            .unwrap_or_else(Vec::new);
+
         for consumer_config in consumer_configs {
-            self.consumer.insert(
-                consumer_config.service.clone(),
-                Url::from_url(
-                    format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
-                )
-                .expect("consumer配置出错!Url生成错误"),
-            );
-        }
-        match &config.nacos {
-            None => {
-                trace!("Nacos not configured, using local YAML configuration for routing");
-                let condition = config.conditions.clone();
-                match condition {
-                    None => {
-                        info!("Unconfigured Condition Router")
-                    }
-                    Some(cons) => {
-                        for con in cons {
-                            self.condition_router_manager.update(con)
-                        }
-                    }
-                }
-                let tag = config.tags.clone();
-                match tag {
-                    None => {
-                        info!("Unconfigured Tag Router")
-                    }
-                    Some(ta) => {
-                        self.tag_router_manager.init();
-                        self.tag_router_manager.update(ta)
-                    }
-                }
-            }
-            Some(config) => {
-                self.init_nacos(config.clone());
-            }
+            let service_url = Url::from_url(
+                format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
+            )
+            .expect("Consumer config error");
+
+            self.consumer.insert(consumer_config.service, service_url);
         }
     }
 }
diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs
index ce30c92..8dc2499 100644
--- a/dubbo/src/cluster/router/manager/tag_manager.rs
+++ b/dubbo/src/cluster/router/manager/tag_manager.rs
@@ -1,27 +1,20 @@
-use crate::cluster::router::tag::tag_router::TagRouter;
+use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner};
 use dubbo_config::router::TagRouterConfig;
 use std::sync::{Arc, RwLock};
 
 #[derive(Debug, Clone, Default)]
 pub struct TagRouterManager {
-    pub tag_router: Option<Arc<RwLock<TagRouter>>>,
+    pub tag_router: Arc<RwLock<TagRouterInner>>,
 }
 
 impl TagRouterManager {
-    pub fn init(&mut self) {
-        self.tag_router = Some(Arc::new(RwLock::new(TagRouter::default())))
-    }
-
-    pub fn get_router(&self) -> Option<Arc<RwLock<TagRouter>>> {
-        self.tag_router.clone()
+    pub fn get_router(&self, _service_name: &String) -> Option<TagRouter> {
+        Some(TagRouter {
+            inner: self.tag_router.clone(),
+        })
     }
 
     pub fn update(&mut self, config: TagRouterConfig) {
-        self.tag_router
-            .as_ref()
-            .unwrap()
-            .write()
-            .unwrap()
-            .parse_config(config)
+        self.tag_router.write().unwrap().parse_config(config);
     }
 }
diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs
index 84ceae1..17c9aec 100644
--- a/dubbo/src/cluster/router/mod.rs
+++ b/dubbo/src/cluster/router/mod.rs
@@ -9,17 +9,17 @@ use crate::invocation::RpcInvocation;
 use dubbo_base::Url;
 use std::{fmt::Debug, sync::Arc};
 
-pub trait Router: Debug + Clone {
-    fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url>;
+pub trait Router: Debug {
+    fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url>;
 }
 
-// pub type BoxRouter = Box<dyn Router + Sync + Send>;
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
 
 #[derive(Debug, Default, Clone)]
 pub struct MockRouter {}
 
 impl Router for MockRouter {
-    fn route(&self, invokers: Vec<Url>, _url: Url, _invo: Arc<RpcInvocation>) -> Vec<Url> {
+    fn route(&self, invokers: Vec<Url>, _url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
         invokers
     }
 }
diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
index 1600e9b..ce72641 100644
--- a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
@@ -1,7 +1,7 @@
 use crate::cluster::router::manager::router_manager::{
     get_global_router_manager, RouterConfigChangeEvent,
 };
-use dubbo_config::router::{ConditionRouterConfig, NacosConfig, TagRouterConfig};
+use dubbo_config::router::NacosConfig;
 use dubbo_logger::{tracing, tracing::info};
 use nacos_sdk::api::{
     config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
@@ -24,94 +24,86 @@ impl NacosClient {
         let server_addr = config.addr;
         let namespace = config.namespace;
         let app = config.app;
-        match config.enable_auth {
-            None => {
-                info!("disable nacos auth!");
-                info!("nacos init,addr:{}", server_addr);
-                let client = Arc::new(RwLock::new(
-                    ConfigServiceBuilder::new(
-                        ClientProps::new()
-                            .server_addr(server_addr)
-                            .namespace(namespace)
-                            .app_name(app),
-                    )
-                    .build()
-                    .expect("NacosClient build failed!Please check NacosConfig"),
-                ));
-                Self { client }
-            }
-            Some(auth) => {
-                info!("enable nacos auth!");
-                info!("nacos init,addr:{}", server_addr);
-                let client = Arc::new(RwLock::new(
-                    ConfigServiceBuilder::new(
-                        ClientProps::new()
-                            .server_addr(server_addr)
-                            .namespace(namespace)
-                            .app_name(app)
-                            .auth_username(auth.auth_username)
-                            .auth_password(auth.auth_password),
-                    )
-                    // .enable_auth_plugin_http()
-                    .build()
-                    .expect("NacosClient build failed!Please check NacosConfig"),
-                ));
-                return Self { client };
-            }
+        let enable_auth = config.enable_auth;
+
+        let mut props = ClientProps::new()
+            .server_addr(server_addr)
+            .namespace(namespace)
+            .app_name(app);
+
+        if enable_auth.is_some() {
+            info!("enable nacos auth!");
+        } else {
+            info!("disable nacos auth!");
         }
-    }
-    pub fn get_condition_config(&self, data_id: String) -> Option<ConditionRouterConfig> {
-        let config_resp = self
-            .client
-            .read()
-            .unwrap()
-            .get_config(data_id.clone(), "condition".to_string());
-        return match config_resp {
-            Ok(config_resp) => {
-                self.add_listener(data_id.clone(), "condition".to_string());
-                let string = config_resp.content();
-                Some(serde_yaml::from_str(string).unwrap())
-            }
-            Err(_err) => None,
-        };
+
+        if let Some(auth) = enable_auth {
+            props = props
+                .auth_username(auth.auth_username)
+                .auth_password(auth.auth_password);
+        }
+
+        let client = Arc::new(RwLock::new(
+            ConfigServiceBuilder::new(props)
+                .build()
+                .expect("NacosClient build failed! Please check NacosConfig"),
+        ));
+
+        Self { client }
     }
 
-    pub fn get_tag_config(&self, data_id: String) -> Option<TagRouterConfig> {
+    pub fn get_config<T>(&self, data_id: &str, group: &str, config_type: &str) -> Option<T>
+    where
+        T: serde::de::DeserializeOwned,
+    {
         let config_resp = self
             .client
             .read()
             .unwrap()
-            .get_config(data_id.clone(), "tag".to_string());
-        return match config_resp {
+            .get_config(data_id.to_string(), group.to_string());
+
+        match config_resp {
             Ok(config_resp) => {
-                self.add_listener(data_id.clone(), "tag".to_string());
+                self.add_listener(data_id, group);
                 let string = config_resp.content();
                 let result = serde_yaml::from_str(string);
+
                 match result {
                     Ok(config) => {
-                        info!("success to get TagRouter config and parse success");
+                        info!(
+                            "success to get {}Router config and parse success",
+                            config_type
+                        );
                         Some(config)
                     }
-                    _ => {
-                        info!("failed to parse TagRouter rule");
+                    Err(_) => {
+                        info!("failed to parse {}Router rule", config_type);
                         None
                     }
                 }
             }
-            Err(_err) => None,
-        };
+            Err(_) => None,
+        }
     }
-    pub fn add_listener(&self, data_id: String, group: String) {
-        let res_listener = self
+
+    pub fn add_listener(&self, data_id: &str, group: &str) {
+        if let Err(err) = self
             .client
             .write()
-            .expect("failed to create nacos config listener")
-            .add_listener(data_id, group, Arc::new(ConfigChangeListenerImpl {}));
-        match res_listener {
-            Ok(_) => {
-                info!("listening the config success");
-            }
-            Err(err) => tracing::error!("listen config error {:?}", err),
+            .map_err(|e| format!("failed to create nacos config listener: {}", e))
+            .and_then(|client| {
+                client
+                    .add_listener(
+                        data_id.to_string(),
+                        group.to_string(),
+                        Arc::new(ConfigChangeListenerImpl {}),
+                    )
+                    .map_err(|e| format!("failed to add nacos config listener: {}", e))
+            })
+        {
+            tracing::error!("{}", err);
+        } else {
+            info!("listening the config success");
         }
     }
 }
@@ -120,13 +112,15 @@ impl ConfigChangeListener for ConfigChangeListenerImpl {
     fn notify(&self, config_resp: ConfigResponse) {
         let content_type = config_resp.content_type();
         let event = RouterConfigChangeEvent {
-            service_name: config_resp.data_id().clone(),
-            router_kind: config_resp.group().clone(),
-            content: config_resp.content().clone(),
+            service_name: config_resp.data_id().to_string(),
+            router_kind: config_resp.group().to_string(),
+            content: config_resp.content().to_string(),
         };
+
         if content_type == "yaml" {
             get_global_router_manager().write().unwrap().notify(event);
         }
-        info!("notify config={:?}", config_resp);
+
+        info!("notify config: {:?}", config_resp);
     }
 }
diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs
index 930f0f2..42d5826 100644
--- a/dubbo/src/cluster/router/router_chain.rs
+++ b/dubbo/src/cluster/router/router_chain.rs
@@ -1,52 +1,30 @@
-use crate::{
-    cluster::router::{
-        condition::condition_router::ConditionRouter, tag::tag_router::TagRouter, Router,
-    },
-    invocation::RpcInvocation,
-};
+use crate::{cluster::router::BoxRouter, invocation::RpcInvocation};
 use dubbo_base::Url;
-use std::sync::{Arc, RwLock};
+use std::{collections::HashMap, sync::Arc};
 
-#[derive(Debug, Default, Clone)]
+#[derive(Debug, Default)]
 pub struct RouterChain {
-    pub condition_router: Option<ConditionRouter>,
-    pub tag_router: Option<Arc<RwLock<TagRouter>>>,
+    pub routers: HashMap<String, BoxRouter>,
     pub self_url: Url,
 }
 
 impl RouterChain {
     pub fn new() -> Self {
         RouterChain {
-            condition_router: None,
-            tag_router: None,
+            routers: HashMap::new(),
             self_url: Url::new(),
         }
     }
-    pub fn set_condition_router(&mut self, router: Option<ConditionRouter>) {
-        self.condition_router = router;
-    }
-    pub fn set_tag_router(&mut self, router: Option<Arc<RwLock<TagRouter>>>) {
-        self.tag_router = router;
-    }
-    pub fn route(&self, invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
-        let mut result = invokers.clone();
-        match &self.tag_router {
-            None => {}
-            Some(router) => {
-                result =
-                    router
-                        .read()
-                        .unwrap()
-                        .route(result, self.self_url.clone(), invocation.clone())
-            }
-        }
-        match &self.condition_router {
-            Some(router) => {
-                result = router.route(result, self.self_url.clone(), invocation.clone())
-            }
-            None => {}
+
+    pub fn route(&self, mut invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        for (_, value) in self.routers.iter() {
+            invokers = value.route(invokers, self.self_url.clone(), invocation.clone())
         }
-        result
+        invokers
+    }
+
+    pub fn add_router(&mut self, key: String, router: BoxRouter) {
+        self.routers.insert(key, router);
     }
 }
 
@@ -54,13 +32,16 @@ impl RouterChain {
 fn test() {
     use crate::cluster::router::manager::router_manager::get_global_router_manager;
 
-    let u1 = Url::from_url("triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
-    let u2 = Url::from_url("triple://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
-    let u3 = Url::from_url("triple://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
-    let u4 = Url::from_url("triple://127.0.2.1:880/org.apache.dubbo.sample.tri.Greeter").unwrap();
-    let u5 = Url::from_url("triple://127.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
-    let invos = vec![u1, u2, u3, u4, u5];
-    let invo = Arc::new(
+    let u1 = Url::from_url("tri://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u2 = Url::from_url("tri://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u3 = Url::from_url("tri://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u4 = Url::from_url("tri://127.0.2.1:8880/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u5 = Url::from_url("tri://127.0.1.1:8882/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u6 = Url::from_url("tri://213.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let u7 = Url::from_url("tri://169.0.1.1:8887/org.apache.dubbo.sample.tri.Greeter").unwrap();
+    let invs = vec![u1, u2, u3, u4, u5, u6, u7];
+    let len = invs.len().clone();
+    let inv = Arc::new(
         RpcInvocation::default()
             .with_method_name("greet".to_string())
             .with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()),
@@ -68,7 +49,8 @@ fn test() {
     let x = get_global_router_manager()
         .read()
         .unwrap()
-        .get_router_chain(invo.clone());
-    let result = x.route(invos, invo.clone());
+        .get_router_chain(inv.get_target_service_unique_name());
+    let result = x.route(invs, inv.clone());
+    println!("total:{},result:{}", len, result.len().clone());
     dbg!(result);
 }
diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs
index d1a962e..7a83ea5 100644
--- a/dubbo/src/cluster/router/tag/tag_router.rs
+++ b/dubbo/src/cluster/router/tag/tag_router.rs
@@ -4,16 +4,30 @@ use crate::{
 };
 use dubbo_base::Url;
 use dubbo_config::router::TagRouterConfig;
-use std::{collections::HashMap, fmt::Debug, sync::Arc};
+use std::{
+    collections::HashMap,
+    fmt::Debug,
+    sync::{Arc, RwLock},
+};
 
 #[derive(Debug, Clone, Default)]
-pub struct TagRouter {
+pub struct TagRouterInner {
     pub tag_rules: HashMap<String, HashMap<String, String>>,
     pub force: bool,
     pub enabled: bool,
 }
 
-impl TagRouter {
+#[derive(Debug, Clone, Default)]
+pub struct TagRouter {
+    pub(crate) inner: Arc<RwLock<TagRouterInner>>,
+}
+impl Router for TagRouter {
+    fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+        return self.inner.read().unwrap().route(invokers, url, invocation);
+    }
+}
+
+impl TagRouterInner {
     pub fn parse_config(&mut self, config: TagRouterConfig) {
         self.tag_rules = HashMap::new();
         self.force = config.force;
@@ -43,26 +57,28 @@ impl TagRouter {
         }
         tag_result
     }
-}
 
-impl Router for TagRouter {
-    fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+    pub fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
         if !self.enabled {
             return invokers;
         };
         let self_param = to_original_map(url);
         let invocation_tag = self.match_tag(self_param);
         let mut invokers_result = Vec::new();
+        let mut invokers_no_tag = Vec::new();
         for invoker in &invokers {
             let invoker_param = to_original_map(invoker.clone());
             let invoker_tag = self.match_tag(invoker_param);
+            if invoker_tag == None {
+                invokers_no_tag.push(invoker.clone());
+            }
             if invoker_tag == invocation_tag {
-                invokers_result.push(invoker.clone())
+                invokers_result.push(invoker.clone());
             }
         }
         if invokers_result.is_empty() {
             if !self.force {
-                return invokers;
+                return invokers_no_tag;
             }
         }
         invokers_result
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index d3eb8ad..5750337 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -196,7 +196,7 @@ pub trait Invocation {
     fn get_method_name(&self) -> String;
 }
 
-#[derive(Default,Clone)]
+#[derive(Default, Clone)]
 pub struct RpcInvocation {
     target_service_unique_name: String,
     method_name: String,
@@ -224,4 +224,4 @@ impl Invocation for RpcInvocation {
     fn get_method_name(&self) -> String {
         self.method_name.clone()
     }
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 5008055..c941ae5 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -21,12 +21,11 @@ use std::{
 };
 
 use async_trait::async_trait;
-use aws_smithy_http::body::SdkBody;
 use tower_service::Service;
 
 use dubbo_base::Url;
 
-use crate::triple::client::replay::{ClonedBytesStream, ClonedBody};
+use crate::triple::client::replay::ClonedBody;
 
 pub mod server_desc;
 pub mod triple;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index af17445..9f9c769 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -24,7 +24,10 @@ use tower_service::Service;
 
 use crate::{
     protocol::Invoker,
-    triple::{client::{builder::ClientBoxService, replay::ClonedBody}, transport::connection::Connection},
+    triple::{
+        client::{builder::ClientBoxService, replay::ClonedBody},
+        transport::connection::Connection,
+    },
     utils::boxed_clone::BoxCloneService,
 };
 
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 32b26a5..a9756f7 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
 
 use crate::{
     cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
-    codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+    codegen::{RegistryDirectory, TripleInvoker},
     protocol::BoxInvoker,
     utils::boxed_clone::BoxCloneService,
 };
@@ -101,14 +101,14 @@ impl ClientBuilder {
         Self { direct, ..self }
     }
 
-    pub fn build(self) -> Option<BoxInvoker> {
+    pub fn build(self, service_name: String) -> Option<BoxInvoker> {
         if self.direct {
             return Some(Box::new(TripleInvoker::new(
                 Url::from_url(&self.host).unwrap(),
             )));
         }
 
-        let cluster = MockCluster::default().join(Box::new(MockDirectory::new()));
+        let cluster = MockCluster::default().join(Box::new(MockDirectory::new(service_name)));
 
         return Some(cluster);
     }
diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs
index 97be85e..013d9e3 100644
--- a/dubbo/src/triple/client/mod.rs
+++ b/dubbo/src/triple/client/mod.rs
@@ -16,6 +16,6 @@
  */
 
 pub mod builder;
-pub mod triple;
 pub mod replay;
+pub mod triple;
 pub use triple::TripleClient;
diff --git a/dubbo/src/triple/client/replay.rs b/dubbo/src/triple/client/replay.rs
index 4ba8b93..195d750 100644
--- a/dubbo/src/triple/client/replay.rs
+++ b/dubbo/src/triple/client/replay.rs
@@ -253,7 +253,7 @@ impl Body for ClonedBody {
 
     fn poll_trailers(
         self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
+        _cx: &mut Context<'_>,
     ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
         Poll::Ready(Ok(None))
     }
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index 2227341..ace7aaf 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -21,12 +21,11 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
 
 use http::HeaderValue;
 
-use super::builder::ClientBuilder;
-use super::replay::ClonedBody;
+use super::{builder::ClientBuilder, replay::ClonedBody};
 use crate::codegen::RpcInvocation;
 
 use crate::{
-    invocation::{IntoStreamingRequest, Metadata, Request, Response},
+    invocation::{IntoStreamingRequest, Invocation, Metadata, Request, Response},
     triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode},
 };
 
@@ -153,7 +152,7 @@ impl TripleClient {
             .builder
             .clone()
             .unwrap()
-            .build()
+            .build(invocation.get_target_service_unique_name())
             .unwrap();
 
         let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -212,14 +211,13 @@ impl TripleClient {
         )
         .into_stream();
 
-
         let body = ClonedBody::new(en);
 
         let mut conn = self
             .builder
             .clone()
             .unwrap()
-            .build()
+            .build(invocation.get_target_service_unique_name())
             .unwrap();
 
         let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -267,7 +265,7 @@ impl TripleClient {
             .builder
             .clone()
             .unwrap()
-            .build()
+            .build(invocation.get_target_service_unique_name())
             .unwrap();
 
         let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -333,7 +331,7 @@ impl TripleClient {
             .builder
             .clone()
             .unwrap()
-            .build()
+            .build(invocation.get_target_service_unique_name())
             .unwrap();
 
         let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index e756d5c..0701ec3 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -36,16 +36,12 @@ pub mod echo_client {
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 .with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("UnaryEcho"));
-            let path = http::uri::PathAndQuery::from_static(
-                "/grpc.examples.echo.Echo/UnaryEcho",
-            );
+            let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
             self.inner.unary(request, codec, path, invocation).await
         }
         /// ServerStreamingEcho is server side streaming.
@@ -53,51 +49,51 @@ pub mod echo_client {
             &mut self,
             request: Request<super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 .with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ServerStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ServerStreamingEcho",
             );
-            self.inner.server_streaming(request, codec, path, invocation).await
+            self.inner
+                .server_streaming(request, codec, path, invocation)
+                .await
         }
         /// ClientStreamingEcho is client side streaming.
         pub async fn client_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 .with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("ClientStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/ClientStreamingEcho",
             );
-            self.inner.client_streaming(request, codec, path, invocation).await
+            self.inner
+                .client_streaming(request, codec, path, invocation)
+                .await
         }
         /// BidirectionalStreamingEcho is bidi streaming.
         pub async fn bidirectional_streaming_echo(
             &mut self,
             request: impl IntoStreamingRequest<Message = super::EchoRequest>,
         ) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
-            let codec = dubbo::codegen::ProstCodec::<
-                super::EchoRequest,
-                super::EchoResponse,
-            >::default();
+            let codec =
+                dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
             let invocation = RpcInvocation::default()
                 .with_service_unique_name(String::from("grpc.examples.echo.Echo"))
                 .with_method_name(String::from("BidirectionalStreamingEcho"));
             let path = http::uri::PathAndQuery::from_static(
                 "/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
             );
-            self.inner.bidi_streaming(request, codec, path, invocation).await
+            self.inner
+                .bidi_streaming(request, codec, path, invocation)
+                .await
         }
     }
 }
@@ -114,9 +110,7 @@ pub mod echo_server {
             request: Request<super::EchoRequest>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the ServerStreamingEcho method.
-        type ServerStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// ServerStreamingEcho is server side streaming.
@@ -130,19 +124,14 @@ pub mod echo_server {
             request: Request<Decoding<super::EchoRequest>>,
         ) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
         ///Server streaming response type for the BidirectionalStreamingEcho method.
-        type BidirectionalStreamingEchoStream: futures_util::Stream<
-                Item = Result<super::EchoResponse, dubbo::status::Status>,
-            >
+        type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
             + Send
             + 'static;
         /// BidirectionalStreamingEcho is bidi streaming.
         async fn bidirectional_streaming_echo(
             &self,
             request: Request<Decoding<super::EchoRequest>>,
-        ) -> Result<
-            Response<Self::BidirectionalStreamingEchoStream>,
-            dubbo::status::Status,
-        >;
+        ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
     }
     /// Echo is the echo service.
     #[derive(Debug)]
@@ -172,10 +161,7 @@ pub mod echo_server {
         type Response = http::Response<BoxBody>;
         type Error = std::convert::Infallible;
         type Future = BoxFuture<Self::Response, Self::Error>;
-        fn poll_ready(
-            &mut self,
-            _cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
+        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
             Poll::Ready(Ok(()))
         }
         fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -188,26 +174,18 @@ pub mod echo_server {
                     }
                     impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
+                        fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
                             let fut = async move { inner.unary_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server.unary(UnaryEchoServer { inner }, req).await;
                         Ok(res)
                     };
@@ -218,32 +196,22 @@ pub mod echo_server {
                     struct ServerStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
-                    for ServerStreamingEchoServer<T> {
+                    impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::ServerStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
-                        fn call(
-                            &mut self,
-                            request: Request<super::EchoRequest>,
-                        ) -> Self::Future {
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
+                        fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.server_streaming_echo(request).await
-                            };
+                            let fut = async move { inner.server_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .server_streaming(ServerStreamingEchoServer { inner }, req)
                             .await;
@@ -256,31 +224,23 @@ pub mod echo_server {
                     struct ClientStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
-                    for ClientStreamingEchoServer<T> {
+                    impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
-                        type Future = BoxFuture<
-                            Response<Self::Response>,
-                            dubbo::status::Status,
-                        >;
+                        type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.client_streaming_echo(request).await
-                            };
+                            let fut = async move { inner.client_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
                             .client_streaming(ClientStreamingEchoServer { inner }, req)
                             .await;
@@ -293,56 +253,41 @@ pub mod echo_server {
                     struct BidirectionalStreamingEchoServer<T: Echo> {
                         inner: _Inner<T>,
                     }
-                    impl<T: Echo> StreamingSvc<super::EchoRequest>
-                    for BidirectionalStreamingEchoServer<T> {
+                    impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
                         type Response = super::EchoResponse;
                         type ResponseStream = T::BidirectionalStreamingEchoStream;
-                        type Future = BoxFuture<
-                            Response<Self::ResponseStream>,
-                            dubbo::status::Status,
-                        >;
+                        type Future =
+                            BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
                         fn call(
                             &mut self,
                             request: Request<Decoding<super::EchoRequest>>,
                         ) -> Self::Future {
                             let inner = self.inner.0.clone();
-                            let fut = async move {
-                                inner.bidirectional_streaming_echo(request).await
-                            };
+                            let fut =
+                                async move { inner.bidirectional_streaming_echo(request).await };
                             Box::pin(fut)
                         }
                     }
                     let fut = async move {
-                        let mut server = TripleServer::new(
-                            dubbo::codegen::ProstCodec::<
-                                super::EchoResponse,
-                                super::EchoRequest,
-                            >::default(),
-                        );
+                        let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+                            super::EchoResponse,
+                            super::EchoRequest,
+                        >::default());
                         let res = server
-                            .bidi_streaming(
-                                BidirectionalStreamingEchoServer {
-                                    inner,
-                                },
-                                req,
-                            )
+                            .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
                             .await;
                         Ok(res)
                     };
                     Box::pin(fut)
                 }
-                _ => {
-                    Box::pin(async move {
-                        Ok(
-                            http::Response::builder()
-                                .status(200)
-                                .header("grpc-status", "12")
-                                .header("content-type", "application/grpc")
-                                .body(empty_body())
-                                .unwrap(),
-                        )
-                    })
-                }
+                _ => Box::pin(async move {
+                    Ok(http::Response::builder()
+                        .status(200)
+                        .header("grpc-status", "12")
+                        .header("content-type", "application/grpc")
+                        .body(empty_body())
+                        .unwrap())
+                }),
             }
         }
     }