You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by ya...@apache.org on 2023/10/07 15:20:20 UTC
[dubbo-rust] branch feat/cluster updated: Feat/cluster Optimized the Router module (#160)
This is an automated email from the ASF dual-hosted git repository.
yangyang pushed a commit to branch feat/cluster
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/feat/cluster by this push:
new a957c54 Feat/cluster Optimized the Router module (#160)
a957c54 is described below
commit a957c54d1d58a02a4e60b95b1f49f42e8cb4e992
Author: Urara <95...@users.noreply.github.com>
AuthorDate: Sat Oct 7 23:20:15 2023 +0800
Feat/cluster Optimized the Router module (#160)
* perf: Optimized the logic of the routing module.
Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.
* perf: Optimized the logic of the routing module.
Refactored route logic decision-making, eliminating unnecessary cloning
and improving performance.
* perf: Removed unnecessary configurations.
* perf: Removed unnecessary configurations.
* perf: Optimized the Router module
Optimized the Router module
Added Router Chain to MockDirectory
---
application.yaml | 32 +---
config/src/router.rs | 6 +-
dubbo/src/cluster/directory.rs | 9 +-
dubbo/src/cluster/mod.rs | 67 ++++----
.../cluster/router/condition/condition_router.rs | 13 +-
dubbo/src/cluster/router/condition/matcher.rs | 80 ++++-----
.../src/cluster/router/condition/single_router.rs | 34 ++--
.../cluster/router/manager/condition_manager.rs | 73 ++++-----
dubbo/src/cluster/router/manager/router_manager.rs | 137 ++++++++--------
dubbo/src/cluster/router/manager/tag_manager.rs | 21 +--
dubbo/src/cluster/router/mod.rs | 8 +-
.../router/nacos_config_center/nacos_client.rs | 138 ++++++++--------
dubbo/src/cluster/router/router_chain.rs | 72 ++++-----
dubbo/src/cluster/router/tag/tag_router.rs | 32 +++-
dubbo/src/invocation.rs | 4 +-
dubbo/src/protocol/mod.rs | 3 +-
dubbo/src/protocol/triple/triple_invoker.rs | 5 +-
dubbo/src/triple/client/builder.rs | 6 +-
dubbo/src/triple/client/mod.rs | 2 +-
dubbo/src/triple/client/replay.rs | 2 +-
dubbo/src/triple/client/triple.rs | 14 +-
examples/echo/src/generated/grpc.examples.echo.rs | 179 +++++++--------------
22 files changed, 396 insertions(+), 541 deletions(-)
diff --git a/application.yaml b/application.yaml
index 902b0ef..bec29a6 100644
--- a/application.yaml
+++ b/application.yaml
@@ -25,33 +25,5 @@ dubbo:
routers:
consumer:
- service: "org.apache.dubbo.sample.tri.Greeter"
- url: triple://localhost:20000
- protocol: triple
- nacos:
- addr: "127.0.0.1:8848"
- namespace: ""
- app: ""
- conditions:
- - scope: "service"
- force: false
- runtime: true
- enabled: true
- key: "org.apache.dubbo.sample.tri.Greeter"
- conditions:
- - method=greet => port=8889
- - scope: "service"
- force: true
- runtime: true
- enabled: true
- key: "user.UserService"
- conditions:
- - method=get_s => port=2003
- tags:
- force: true
- enabled: true
- key: shop-detail
- tags:
- - name: gray
- matches:
- - key: env
- value: gray
\ No newline at end of file
+ url: tri://127.0.0.1:20000
+ protocol: triple
\ No newline at end of file
diff --git a/config/src/router.rs b/config/src/router.rs
index 98aa14a..b45bd47 100644
--- a/config/src/router.rs
+++ b/config/src/router.rs
@@ -2,9 +2,10 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
pub struct ConditionRouterConfig {
+ #[serde(rename = "configVersion")]
+ pub config_version: String,
pub scope: String,
pub force: bool,
- pub runtime: bool,
pub enabled: bool,
pub key: String,
pub conditions: Vec<String>,
@@ -12,6 +13,8 @@ pub struct ConditionRouterConfig {
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct TagRouterConfig {
+ #[serde(rename = "configVersion")]
+ pub config_version: String,
pub force: bool,
pub enabled: bool,
pub key: String,
@@ -28,6 +31,7 @@ pub struct ConsumerConfig {
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
pub struct Tag {
pub name: String,
+ #[serde(rename = "match")]
pub matches: Vec<TagMatchRule>,
}
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 547878b..66a3c43 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -39,7 +39,7 @@ use tower::{
ready_cache::ReadyCache,
};
-use crate::cluster::Directory;
+use crate::{cluster::Directory, codegen::RpcInvocation, invocation::Invocation};
/// Directory.
///
@@ -68,12 +68,12 @@ impl StaticDirectory {
}
impl Directory for StaticDirectory {
- fn list(&self, service_name: String) -> Vec<BoxInvoker> {
+ fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let url = Url::from_url(&format!(
"tri://{}:{}/{}",
self.uri.host().unwrap(),
self.uri.port().unwrap(),
- service_name,
+ inv.get_target_service_unique_name(),
))
.unwrap();
let invoker = Box::new(TripleInvoker::new(url));
@@ -97,7 +97,8 @@ impl RegistryDirectory {
}
impl Directory for RegistryDirectory {
- fn list(&self, service_name: String) -> Vec<BoxInvoker> {
+ fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
+ let service_name = inv.get_target_service_unique_name();
let url = Url::from_url(&format!(
"triple://{}:{}/{}",
"127.0.0.1", "8888", service_name
diff --git a/dubbo/src/cluster/mod.rs b/dubbo/src/cluster/mod.rs
index 7411a67..964150d 100644
--- a/dubbo/src/cluster/mod.rs
+++ b/dubbo/src/cluster/mod.rs
@@ -23,6 +23,9 @@ use tower::{ready_cache::ReadyCache, ServiceExt};
use tower_service::Service;
use crate::{
+ cluster::router::{
+ manager::router_manager::get_global_router_manager, router_chain::RouterChain,
+ },
codegen::RpcInvocation,
invocation::Invocation,
protocol::{triple::triple_invoker::TripleInvoker, BoxInvoker, Invoker},
@@ -35,7 +38,7 @@ pub mod loadbalance;
pub mod router;
pub trait Directory: Debug {
- fn list(&self, service_name: String) -> Vec<BoxInvoker>;
+ fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
}
type BoxDirectory = Box<dyn Directory + Send + Sync>;
@@ -134,7 +137,7 @@ impl Service<http::Request<ClonedBody>> for FailoverCluster {
let inv = inv.unwrap();
let service_name = inv.get_target_service_unique_name();
- let invokers = self.dir.list(service_name.clone());
+ let invokers = self.dir.list(Arc::new(inv.clone()));
Box::pin(async move {
let mut current_req = req;
@@ -172,55 +175,36 @@ impl Invoker<http::Request<ClonedBody>> for FailoverCluster {
#[derive(Debug, Default)]
pub struct MockDirectory {
- // router_chain: RouterChain,
+ router_chain: RouterChain,
}
impl MockDirectory {
- pub fn new() -> MockDirectory {
- // let router_chain = get_global_router_manager().read().unwrap().get_router_chain(invocation);
- Self {
- // router_chain
- }
+ pub fn new(service_name: String) -> MockDirectory {
+ let router_chain = get_global_router_manager()
+ .read()
+ .unwrap()
+ .get_router_chain(service_name);
+ Self { router_chain }
}
}
impl Directory for MockDirectory {
- fn list(&self, service_name: String) -> Vec<BoxInvoker> {
- // tracing::info!("MockDirectory: {}", meta);
+ fn list(&self, inv: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
- vec![Box::new(TripleInvoker::new(u))]
- // self.router_chain.route(u, invo);
+ let mut urls = vec![u];
+ // tracing::info!("MockDirectory: {}", meta);
+ urls = self.router_chain.route(urls, inv);
+ let mut result = Vec::new();
+ for url in urls {
+ result.push(Box::new(TripleInvoker::new(url)) as BoxInvoker);
+ }
+ result
}
}
-
-// #[derive(Debug, Default)]
-// pub struct RouterChain {
-// router: HashMap<String, BoxRouter>,
-// invokers: Arc<Vec<BoxInvoker>>,
-// }
-
-// impl RouterChain {
-// pub fn route(&mut self, url: Url, invo: Arc<RpcInvocation>) -> Arc<Vec<BoxInvoker>> {
-// let r = self.router.get("mock").unwrap();
-// r.route(self.invokers.clone(), url, invo)
-// }
-// }
-
-// pub trait Router: Debug {
-// fn route(
-// &self,
-// invokers: Arc<Vec<BoxInvoker>>,
-// url: Url,
-// invo: Arc<RpcInvocation>,
-// ) -> Arc<Vec<BoxInvoker>>;
-// }
-
-// pub type BoxRouter = Box<dyn Router + Sync + Send>;
-
#[cfg(test)]
pub mod tests {
- use std::task::Poll;
+ use std::{sync::Arc, task::Poll};
use bytes::{Buf, BufMut, BytesMut};
use dubbo_base::Url;
@@ -250,8 +234,11 @@ pub mod tests {
struct MockDirectory;
impl Directory for MockDirectory {
- fn list(&self, service_name: String) -> Vec<crate::protocol::BoxInvoker> {
- println!("get invoker list for {}", service_name);
+ fn list(&self, inv: Arc<RpcInvocation>) -> Vec<crate::protocol::BoxInvoker> {
+ println!(
+ "get invoker list for {}",
+ inv.get_target_service_unique_name()
+ );
vec![
Box::new(MockInvoker(1)),
diff --git a/dubbo/src/cluster/router/condition/condition_router.rs b/dubbo/src/cluster/router/condition/condition_router.rs
index f39cd1c..73aca00 100644
--- a/dubbo/src/cluster/router/condition/condition_router.rs
+++ b/dubbo/src/cluster/router/condition/condition_router.rs
@@ -17,19 +17,18 @@ pub struct ConditionRouter {
}
impl Router for ConditionRouter {
- fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
- let mut invokers_result = invokers.clone();
- if let Some(routers) = self.application_routers.clone() {
+ fn route(&self, mut invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url> {
+ if let Some(routers) = &self.application_routers {
for router in &routers.read().unwrap().routers {
- invokers_result = router.route(invokers_result, url.clone(), invo.clone())
+ invokers = router.route(invokers, url.clone(), invo.clone());
}
}
- if let Some(routers) = self.service_routers.clone() {
+ if let Some(routers) = &self.service_routers {
for router in &routers.read().unwrap().routers {
- invokers_result = router.route(invokers_result, url.clone(), invo.clone())
+ invokers = router.route(invokers, url.clone(), invo.clone());
}
}
- invokers_result
+ invokers
}
}
diff --git a/dubbo/src/cluster/router/condition/matcher.rs b/dubbo/src/cluster/router/condition/matcher.rs
index 8c9177e..92bbe2d 100644
--- a/dubbo/src/cluster/router/condition/matcher.rs
+++ b/dubbo/src/cluster/router/condition/matcher.rs
@@ -1,6 +1,5 @@
-use crate::codegen::RpcInvocation;
use regex::Regex;
-use std::{collections::HashSet, error::Error, option::Option, sync::Arc};
+use std::{collections::HashSet, error::Error, option::Option};
#[derive(Clone, Debug, Default)]
pub struct ConditionMatcher {
@@ -18,50 +17,25 @@ impl ConditionMatcher {
}
}
- pub fn is_match(
- &self,
- value: Option<String>,
- invocation: Arc<RpcInvocation>,
- _is_when: bool,
- ) -> Result<bool, Box<dyn Error>> {
+ pub fn is_match(&self, value: Option<String>) -> Result<bool, Box<dyn Error>> {
match value {
- None => {
- // if key does not present in whichever of url, invocation or attachment based on the matcher type, then return false.
- Ok(false)
- }
+ None => Ok(false),
Some(val) => {
- if !self.matches.is_empty() && self.mismatches.is_empty() {
- for match_ in self.matches.iter() {
- if self.do_pattern_match(match_, &val, invocation.clone())? {
- return Ok(true);
- }
- }
- Ok(false)
- } else if !self.mismatches.is_empty() && self.matches.is_empty() {
- for mismatch in self.mismatches.iter() {
- if self.do_pattern_match(mismatch, &val, invocation.clone())? {
- return Ok(false);
- }
+ for match_ in self.matches.iter() {
+ if self.do_pattern_match(match_, &val) {
+ return Ok(true);
}
- Ok(true)
- } else if !self.matches.is_empty() && !self.mismatches.is_empty() {
- for mismatch in self.mismatches.iter() {
- if self.do_pattern_match(mismatch, &val, invocation.clone())? {
- return Ok(false);
- }
- }
- for match_ in self.matches.iter() {
- if self.do_pattern_match(match_, &val, invocation.clone())? {
- return Ok(true);
- }
+ }
+ for mismatch in self.mismatches.iter() {
+ if !self.do_pattern_match(mismatch, &val) {
+ return Ok(true);
}
- Ok(false)
- } else {
- Ok(false)
}
+ Ok(false)
}
}
}
+
pub fn get_matches(&mut self) -> &mut HashSet<String> {
&mut self.matches
}
@@ -69,16 +43,26 @@ impl ConditionMatcher {
&mut self.mismatches
}
- fn do_pattern_match(
- &self,
- pattern: &String,
- value: &String,
- _invocation: Arc<RpcInvocation>,
- ) -> Result<bool, Box<dyn Error>> {
- if pattern.contains("*") {
- return Ok(star_matcher(pattern, value));
+ fn do_pattern_match(&self, pattern: &str, value: &str) -> bool {
+ if pattern.contains('*') {
+ return star_matcher(pattern, value);
+ }
+
+ if pattern.contains('~') {
+ let parts: Vec<&str> = pattern.split('~').collect();
+
+ if parts.len() == 2 {
+ if let (Ok(left), Ok(right), Ok(val)) = (
+ parts[0].parse::<i32>(),
+ parts[1].parse::<i32>(),
+ value.parse::<i32>(),
+ ) {
+ return range_matcher(val, left, right);
+ }
+ }
+ return false;
}
- Ok(pattern.eq(value))
+ pattern == value
}
}
@@ -89,6 +73,6 @@ pub fn star_matcher(pattern: &str, input: &str) -> bool {
regex.is_match(input)
}
-pub fn _range_matcher(val: i32, min: i32, max: i32) -> bool {
+pub fn range_matcher(val: i32, min: i32, max: i32) -> bool {
min <= val && val <= max
}
diff --git a/dubbo/src/cluster/router/condition/single_router.rs b/dubbo/src/cluster/router/condition/single_router.rs
index e290b15..5f06aa8 100644
--- a/dubbo/src/cluster/router/condition/single_router.rs
+++ b/dubbo/src/cluster/router/condition/single_router.rs
@@ -169,20 +169,16 @@ impl ConditionSingleRouter {
pub fn match_when(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
if self.when_condition.is_empty() {
- true
- } else {
- false
- };
- self.do_match(url, &self.when_condition, invocation, true)
+ return true;
+ }
+ self.do_match(url, &self.when_condition, invocation)
}
pub fn match_then(&self, url: Url, invocation: Arc<RpcInvocation>) -> bool {
- if self.when_condition.is_empty() {
- true
- } else {
- false
- };
- self.do_match(url, &self.then_condition, invocation, false)
+ if self.then_condition.is_empty() {
+ return false;
+ }
+ self.do_match(url, &self.then_condition, invocation)
}
pub fn do_match(
@@ -190,25 +186,19 @@ impl ConditionSingleRouter {
url: Url,
conditions: &HashMap<String, Arc<RwLock<ConditionMatcher>>>,
invocation: Arc<RpcInvocation>,
- is_when: bool,
) -> bool {
let sample: HashMap<String, String> = to_original_map(url);
- for (key, condition_matcher) in conditions {
+ conditions.iter().all(|(key, condition_matcher)| {
let matcher = condition_matcher.read().unwrap();
let value = get_value(key, &sample, invocation.clone());
- match matcher.is_match(value, invocation.clone(), is_when) {
- Ok(result) => {
- if !result {
- return false;
- }
- }
+ match matcher.is_match(value) {
+ Ok(result) => result,
Err(error) => {
info!("Error occurred: {:?}", error);
- return false;
+ false
}
}
- }
- true
+ })
}
}
diff --git a/dubbo/src/cluster/router/manager/condition_manager.rs b/dubbo/src/cluster/router/manager/condition_manager.rs
index 4207b79..7ad5e1b 100644
--- a/dubbo/src/cluster/router/manager/condition_manager.rs
+++ b/dubbo/src/cluster/router/manager/condition_manager.rs
@@ -16,28 +16,30 @@ pub struct ConditionRouterManager {
}
impl ConditionRouterManager {
- pub fn get_router(&self, service_name: String) -> Option<ConditionRouter> {
- let routers_service = self.routers_service.get(&service_name);
- match routers_service {
- Some(routers_service) => {
- if self.routers_application.read().unwrap().is_null() {
- return Some(ConditionRouter::new(Some(routers_service.clone()), None));
- }
- Some(ConditionRouter::new(
+ pub fn get_router(&self, service_name: &String) -> Option<ConditionRouter> {
+ let routers_application_is_null = self.routers_application.read().unwrap().is_null();
+ self.routers_service
+ .get(service_name)
+ .map(|routers_service| {
+ ConditionRouter::new(
Some(routers_service.clone()),
- Some(self.routers_application.clone()),
- ))
- }
- None => {
- if self.routers_application.read().unwrap().is_null() {
- return None;
+ if routers_application_is_null {
+ None
+ } else {
+ Some(self.routers_application.clone())
+ },
+ )
+ })
+ .or_else(|| {
+ if routers_application_is_null {
+ None
+ } else {
+ Some(ConditionRouter::new(
+ None,
+ Some(self.routers_application.clone()),
+ ))
}
- Some(ConditionRouter::new(
- None,
- Some(self.routers_application.clone()),
- ))
- }
- }
+ })
}
pub fn update(&mut self, config: ConditionRouterConfig) {
@@ -45,31 +47,26 @@ impl ConditionRouterManager {
let scope = config.scope;
let key = config.key;
let enable = config.enabled;
- let mut routers = Vec::new();
- for condition in config.conditions {
- routers.push(ConditionSingleRouter::new(condition, force, enable));
- }
+
+ let routers = config
+ .conditions
+ .into_iter()
+ .map(|condition| ConditionSingleRouter::new(condition, force, enable))
+ .collect::<Vec<_>>();
+
match scope.as_str() {
"application" => {
self.routers_application.write().unwrap().routers = routers;
}
"service" => {
- if let Some(x) = self.routers_service.get(&key) {
- x.write().unwrap().routers = routers
- } else {
- self.routers_service.insert(
- key,
- Arc::new(RwLock::new(ConditionSingleRouters::new(routers))),
- );
- }
+ self.routers_service
+ .entry(key)
+ .or_insert_with(|| Arc::new(RwLock::new(ConditionSingleRouters::new(vec![]))))
+ .write()
+ .unwrap()
+ .routers = routers;
}
_ => {}
}
}
-
- pub fn _parse_rules(&mut self, configs: Vec<ConditionRouterConfig>) {
- for config in configs {
- self.update(config)
- }
- }
}
diff --git a/dubbo/src/cluster/router/manager/router_manager.rs b/dubbo/src/cluster/router/manager/router_manager.rs
index a2a6658..e963181 100644
--- a/dubbo/src/cluster/router/manager/router_manager.rs
+++ b/dubbo/src/cluster/router/manager/router_manager.rs
@@ -1,10 +1,7 @@
-use crate::{
- cluster::router::{
- manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
- nacos_config_center::nacos_client::NacosClient,
- router_chain::RouterChain,
- },
- invocation::{Invocation, RpcInvocation},
+use crate::cluster::router::{
+ manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
+ nacos_config_center::nacos_client::NacosClient,
+ router_chain::RouterChain,
};
use dubbo_base::Url;
use dubbo_config::{
@@ -19,7 +16,8 @@ use std::{
};
pub static GLOBAL_ROUTER_MANAGER: OnceCell<Arc<RwLock<RouterManager>>> = OnceCell::new();
-
+const TAG: &str = "tag";
+const CONDITION: &str = "condition";
pub struct RouterManager {
pub condition_router_manager: ConditionRouterManager,
pub tag_router_manager: TagRouterManager,
@@ -28,30 +26,28 @@ pub struct RouterManager {
}
impl RouterManager {
- pub fn get_router_chain(&self, invocation: Arc<RpcInvocation>) -> RouterChain {
- let service = invocation.get_target_service_unique_name().clone();
- let condition_router = self.condition_router_manager.get_router(service.clone());
- let tag_router = self.tag_router_manager.get_router();
+ pub fn get_router_chain(&self, service: String) -> RouterChain {
let mut chain = RouterChain::new();
- match self.consumer.get(service.as_str()) {
- None => {}
- Some(url) => {
- chain.set_condition_router(condition_router);
- chain.set_tag_router(tag_router);
- chain.self_url = url.clone();
+ if let Some(url) = self.consumer.get(service.as_str()) {
+ if let Some(tag_router) = self.tag_router_manager.get_router(&service) {
+ chain.add_router(TAG.to_string(), Box::new(tag_router));
+ }
+ if let Some(condition_router) = self.condition_router_manager.get_router(&service) {
+ chain.add_router(CONDITION.to_string(), Box::new(condition_router));
}
+ chain.self_url = url.clone();
}
chain
}
pub fn notify(&mut self, event: RouterConfigChangeEvent) {
match event.router_kind.as_str() {
- "condition" => {
+ CONDITION => {
let config: ConditionRouterConfig =
serde_yaml::from_str(event.content.as_str()).unwrap();
self.condition_router_manager.update(config)
}
- "tag" => {
+ TAG => {
let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap();
self.tag_router_manager.update(config)
}
@@ -66,76 +62,71 @@ impl RouterManager {
self.init_router_managers_for_nacos();
}
- pub fn init_router_managers_for_nacos(&mut self) {
- let config = self
+ fn init_router_managers_for_nacos(&mut self) {
+ if let Some(tag_config) = self
.nacos
.as_ref()
- .unwrap()
- .get_tag_config("application".to_string());
- match config {
- None => {}
- Some(tag_config) => {
- self.tag_router_manager.init();
- self.tag_router_manager.update(tag_config)
- }
+ .and_then(|n| n.get_config("application", TAG, TAG))
+ {
+ self.tag_router_manager.update(tag_config);
}
+
+ if let Some(condition_app_config) = self
+ .nacos
+ .as_ref()
+ .and_then(|n| n.get_config("application", CONDITION, TAG))
+ {
+ self.condition_router_manager.update(condition_app_config);
+ }
+
for (service_name, _) in &self.consumer {
- let config = self
+ if let Some(condition_config) = self
.nacos
.as_ref()
- .unwrap()
- .get_condition_config(service_name.clone());
- match config {
- None => {}
- Some(condition_config) => self.condition_router_manager.update(condition_config),
+ .and_then(|n| n.get_config(service_name, CONDITION, CONDITION))
+ {
+ self.condition_router_manager.update(condition_config);
}
}
}
pub fn init(&mut self) {
let config = get_global_config().routers.clone();
+ self.init_consumer_configs();
+ if let Some(nacos_config) = &config.nacos {
+ self.init_nacos(nacos_config.clone());
+ } else {
+ trace!("Nacos not configured, using local YAML configuration for routing");
+ if let Some(condition_configs) = &config.conditions {
+ for condition_config in condition_configs {
+ self.condition_router_manager
+ .update(condition_config.clone());
+ }
+ } else {
+ info!("Unconfigured Condition Router")
+ }
+ if let Some(tag_config) = &config.tags {
+ self.tag_router_manager.update(tag_config.clone());
+ } else {
+ info!("Unconfigured Tag Router")
+ }
+ }
+ }
+
+ fn init_consumer_configs(&mut self) {
let consumer_configs = get_global_config()
.routers
.consumer
.clone()
- .unwrap_or(Vec::new());
+ .unwrap_or_else(Vec::new);
+
for consumer_config in consumer_configs {
- self.consumer.insert(
- consumer_config.service.clone(),
- Url::from_url(
- format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
- )
- .expect("consumer配置出错!Url生成错误"),
- );
- }
- match &config.nacos {
- None => {
- trace!("Nacos not configured, using local YAML configuration for routing");
- let condition = config.conditions.clone();
- match condition {
- None => {
- info!("Unconfigured Condition Router")
- }
- Some(cons) => {
- for con in cons {
- self.condition_router_manager.update(con)
- }
- }
- }
- let tag = config.tags.clone();
- match tag {
- None => {
- info!("Unconfigured Tag Router")
- }
- Some(ta) => {
- self.tag_router_manager.init();
- self.tag_router_manager.update(ta)
- }
- }
- }
- Some(config) => {
- self.init_nacos(config.clone());
- }
+ let service_url = Url::from_url(
+ format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
+ )
+ .expect("Consumer config error");
+
+ self.consumer.insert(consumer_config.service, service_url);
}
}
}
diff --git a/dubbo/src/cluster/router/manager/tag_manager.rs b/dubbo/src/cluster/router/manager/tag_manager.rs
index ce30c92..8dc2499 100644
--- a/dubbo/src/cluster/router/manager/tag_manager.rs
+++ b/dubbo/src/cluster/router/manager/tag_manager.rs
@@ -1,27 +1,20 @@
-use crate::cluster::router::tag::tag_router::TagRouter;
+use crate::cluster::router::tag::tag_router::{TagRouter, TagRouterInner};
use dubbo_config::router::TagRouterConfig;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Default)]
pub struct TagRouterManager {
- pub tag_router: Option<Arc<RwLock<TagRouter>>>,
+ pub tag_router: Arc<RwLock<TagRouterInner>>,
}
impl TagRouterManager {
- pub fn init(&mut self) {
- self.tag_router = Some(Arc::new(RwLock::new(TagRouter::default())))
- }
-
- pub fn get_router(&self) -> Option<Arc<RwLock<TagRouter>>> {
- self.tag_router.clone()
+ pub fn get_router(&self, _service_name: &String) -> Option<TagRouter> {
+ Some(TagRouter {
+ inner: self.tag_router.clone(),
+ })
}
pub fn update(&mut self, config: TagRouterConfig) {
- self.tag_router
- .as_ref()
- .unwrap()
- .write()
- .unwrap()
- .parse_config(config)
+ self.tag_router.write().unwrap().parse_config(config);
}
}
diff --git a/dubbo/src/cluster/router/mod.rs b/dubbo/src/cluster/router/mod.rs
index 84ceae1..17c9aec 100644
--- a/dubbo/src/cluster/router/mod.rs
+++ b/dubbo/src/cluster/router/mod.rs
@@ -9,17 +9,17 @@ use crate::invocation::RpcInvocation;
use dubbo_base::Url;
use std::{fmt::Debug, sync::Arc};
-pub trait Router: Debug + Clone {
- fn route(&self, invokers: Vec<Url>, url: Url, invo: Arc<RpcInvocation>) -> Vec<Url>;
+pub trait Router: Debug {
+ fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url>;
}
-// pub type BoxRouter = Box<dyn Router + Sync + Send>;
+pub type BoxRouter = Box<dyn Router + Sync + Send>;
#[derive(Debug, Default, Clone)]
pub struct MockRouter {}
impl Router for MockRouter {
- fn route(&self, invokers: Vec<Url>, _url: Url, _invo: Arc<RpcInvocation>) -> Vec<Url> {
+ fn route(&self, invokers: Vec<Url>, _url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
invokers
}
}
diff --git a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
index 1600e9b..ce72641 100644
--- a/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
+++ b/dubbo/src/cluster/router/nacos_config_center/nacos_client.rs
@@ -1,7 +1,7 @@
use crate::cluster::router::manager::router_manager::{
get_global_router_manager, RouterConfigChangeEvent,
};
-use dubbo_config::router::{ConditionRouterConfig, NacosConfig, TagRouterConfig};
+use dubbo_config::router::NacosConfig;
use dubbo_logger::{tracing, tracing::info};
use nacos_sdk::api::{
config::{ConfigChangeListener, ConfigResponse, ConfigService, ConfigServiceBuilder},
@@ -24,94 +24,86 @@ impl NacosClient {
let server_addr = config.addr;
let namespace = config.namespace;
let app = config.app;
- match config.enable_auth {
- None => {
- info!("disable nacos auth!");
- info!("nacos init,addr:{}", server_addr);
- let client = Arc::new(RwLock::new(
- ConfigServiceBuilder::new(
- ClientProps::new()
- .server_addr(server_addr)
- .namespace(namespace)
- .app_name(app),
- )
- .build()
- .expect("NacosClient build failed!Please check NacosConfig"),
- ));
- Self { client }
- }
- Some(auth) => {
- info!("enable nacos auth!");
- info!("nacos init,addr:{}", server_addr);
- let client = Arc::new(RwLock::new(
- ConfigServiceBuilder::new(
- ClientProps::new()
- .server_addr(server_addr)
- .namespace(namespace)
- .app_name(app)
- .auth_username(auth.auth_username)
- .auth_password(auth.auth_password),
- )
- // .enable_auth_plugin_http()
- .build()
- .expect("NacosClient build failed!Please check NacosConfig"),
- ));
- return Self { client };
- }
+ let enable_auth = config.enable_auth;
+
+ let mut props = ClientProps::new()
+ .server_addr(server_addr)
+ .namespace(namespace)
+ .app_name(app);
+
+ if enable_auth.is_some() {
+ info!("enable nacos auth!");
+ } else {
+ info!("disable nacos auth!");
}
- }
- pub fn get_condition_config(&self, data_id: String) -> Option<ConditionRouterConfig> {
- let config_resp = self
- .client
- .read()
- .unwrap()
- .get_config(data_id.clone(), "condition".to_string());
- return match config_resp {
- Ok(config_resp) => {
- self.add_listener(data_id.clone(), "condition".to_string());
- let string = config_resp.content();
- Some(serde_yaml::from_str(string).unwrap())
- }
- Err(_err) => None,
- };
+
+ if let Some(auth) = enable_auth {
+ props = props
+ .auth_username(auth.auth_username)
+ .auth_password(auth.auth_password);
+ }
+
+ let client = Arc::new(RwLock::new(
+ ConfigServiceBuilder::new(props)
+ .build()
+ .expect("NacosClient build failed! Please check NacosConfig"),
+ ));
+
+ Self { client }
}
- pub fn get_tag_config(&self, data_id: String) -> Option<TagRouterConfig> {
+ pub fn get_config<T>(&self, data_id: &str, group: &str, config_type: &str) -> Option<T>
+ where
+ T: serde::de::DeserializeOwned,
+ {
let config_resp = self
.client
.read()
.unwrap()
- .get_config(data_id.clone(), "tag".to_string());
- return match config_resp {
+ .get_config(data_id.to_string(), group.to_string());
+
+ match config_resp {
Ok(config_resp) => {
- self.add_listener(data_id.clone(), "tag".to_string());
+ self.add_listener(data_id, group);
let string = config_resp.content();
let result = serde_yaml::from_str(string);
+
match result {
Ok(config) => {
- info!("success to get TagRouter config and parse success");
+ info!(
+ "success to get {}Router config and parse success",
+ config_type
+ );
Some(config)
}
- _ => {
- info!("failed to parse TagRouter rule");
+ Err(_) => {
+ info!("failed to parse {}Router rule", config_type);
None
}
}
}
- Err(_err) => None,
- };
+ Err(_) => None,
+ }
}
- pub fn add_listener(&self, data_id: String, group: String) {
- let res_listener = self
+
+ pub fn add_listener(&self, data_id: &str, group: &str) {
+ if let Err(err) = self
.client
.write()
- .expect("failed to create nacos config listener")
- .add_listener(data_id, group, Arc::new(ConfigChangeListenerImpl {}));
- match res_listener {
- Ok(_) => {
- info!("listening the config success");
- }
- Err(err) => tracing::error!("listen config error {:?}", err),
+ .map_err(|e| format!("failed to create nacos config listener: {}", e))
+ .and_then(|client| {
+ client
+ .add_listener(
+ data_id.to_string(),
+ group.to_string(),
+ Arc::new(ConfigChangeListenerImpl {}),
+ )
+ .map_err(|e| format!("failed to add nacos config listener: {}", e))
+ })
+ {
+ tracing::error!("{}", err);
+ } else {
+ info!("listening the config success");
}
}
}
@@ -120,13 +112,15 @@ impl ConfigChangeListener for ConfigChangeListenerImpl {
fn notify(&self, config_resp: ConfigResponse) {
let content_type = config_resp.content_type();
let event = RouterConfigChangeEvent {
- service_name: config_resp.data_id().clone(),
- router_kind: config_resp.group().clone(),
- content: config_resp.content().clone(),
+ service_name: config_resp.data_id().to_string(),
+ router_kind: config_resp.group().to_string(),
+ content: config_resp.content().to_string(),
};
+
if content_type == "yaml" {
get_global_router_manager().write().unwrap().notify(event);
}
- info!("notify config={:?}", config_resp);
+
+ info!("notify config: {:?}", config_resp);
}
}
diff --git a/dubbo/src/cluster/router/router_chain.rs b/dubbo/src/cluster/router/router_chain.rs
index 930f0f2..42d5826 100644
--- a/dubbo/src/cluster/router/router_chain.rs
+++ b/dubbo/src/cluster/router/router_chain.rs
@@ -1,52 +1,30 @@
-use crate::{
- cluster::router::{
- condition::condition_router::ConditionRouter, tag::tag_router::TagRouter, Router,
- },
- invocation::RpcInvocation,
-};
+use crate::{cluster::router::BoxRouter, invocation::RpcInvocation};
use dubbo_base::Url;
-use std::sync::{Arc, RwLock};
+use std::{collections::HashMap, sync::Arc};
-#[derive(Debug, Default, Clone)]
+#[derive(Debug, Default)]
pub struct RouterChain {
- pub condition_router: Option<ConditionRouter>,
- pub tag_router: Option<Arc<RwLock<TagRouter>>>,
+ pub routers: HashMap<String, BoxRouter>,
pub self_url: Url,
}
impl RouterChain {
pub fn new() -> Self {
RouterChain {
- condition_router: None,
- tag_router: None,
+ routers: HashMap::new(),
self_url: Url::new(),
}
}
- pub fn set_condition_router(&mut self, router: Option<ConditionRouter>) {
- self.condition_router = router;
- }
- pub fn set_tag_router(&mut self, router: Option<Arc<RwLock<TagRouter>>>) {
- self.tag_router = router;
- }
- pub fn route(&self, invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
- let mut result = invokers.clone();
- match &self.tag_router {
- None => {}
- Some(router) => {
- result =
- router
- .read()
- .unwrap()
- .route(result, self.self_url.clone(), invocation.clone())
- }
- }
- match &self.condition_router {
- Some(router) => {
- result = router.route(result, self.self_url.clone(), invocation.clone())
- }
- None => {}
+
+ pub fn route(&self, mut invokers: Vec<Url>, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ for (_, value) in self.routers.iter() {
+ invokers = value.route(invokers, self.self_url.clone(), invocation.clone())
}
- result
+ invokers
+ }
+
+ pub fn add_router(&mut self, key: String, router: BoxRouter) {
+ self.routers.insert(key, router);
}
}
@@ -54,13 +32,16 @@ impl RouterChain {
fn test() {
use crate::cluster::router::manager::router_manager::get_global_router_manager;
- let u1 = Url::from_url("triple://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
- let u2 = Url::from_url("triple://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
- let u3 = Url::from_url("triple://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
- let u4 = Url::from_url("triple://127.0.2.1:880/org.apache.dubbo.sample.tri.Greeter").unwrap();
- let u5 = Url::from_url("triple://127.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
- let invos = vec![u1, u2, u3, u4, u5];
- let invo = Arc::new(
+ let u1 = Url::from_url("tri://127.0.0.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u2 = Url::from_url("tri://127.0.0.1:8889/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u3 = Url::from_url("tri://127.0.0.1:8800/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u4 = Url::from_url("tri://127.0.2.1:8880/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u5 = Url::from_url("tri://127.0.1.1:8882/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u6 = Url::from_url("tri://213.0.1.1:8888/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let u7 = Url::from_url("tri://169.0.1.1:8887/org.apache.dubbo.sample.tri.Greeter").unwrap();
+ let invs = vec![u1, u2, u3, u4, u5, u6, u7];
+ let len = invs.len().clone();
+ let inv = Arc::new(
RpcInvocation::default()
.with_method_name("greet".to_string())
.with_service_unique_name("org.apache.dubbo.sample.tri.Greeter".to_string()),
@@ -68,7 +49,8 @@ fn test() {
let x = get_global_router_manager()
.read()
.unwrap()
- .get_router_chain(invo.clone());
- let result = x.route(invos, invo.clone());
+ .get_router_chain(inv.get_target_service_unique_name());
+ let result = x.route(invs, inv.clone());
+ println!("total:{},result:{}", len, result.len().clone());
dbg!(result);
}
diff --git a/dubbo/src/cluster/router/tag/tag_router.rs b/dubbo/src/cluster/router/tag/tag_router.rs
index d1a962e..7a83ea5 100644
--- a/dubbo/src/cluster/router/tag/tag_router.rs
+++ b/dubbo/src/cluster/router/tag/tag_router.rs
@@ -4,16 +4,30 @@ use crate::{
};
use dubbo_base::Url;
use dubbo_config::router::TagRouterConfig;
-use std::{collections::HashMap, fmt::Debug, sync::Arc};
+use std::{
+ collections::HashMap,
+ fmt::Debug,
+ sync::{Arc, RwLock},
+};
#[derive(Debug, Clone, Default)]
-pub struct TagRouter {
+pub struct TagRouterInner {
pub tag_rules: HashMap<String, HashMap<String, String>>,
pub force: bool,
pub enabled: bool,
}
-impl TagRouter {
+#[derive(Debug, Clone, Default)]
+pub struct TagRouter {
+ pub(crate) inner: Arc<RwLock<TagRouterInner>>,
+}
+impl Router for TagRouter {
+ fn route(&self, invokers: Vec<Url>, url: Url, invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ return self.inner.read().unwrap().route(invokers, url, invocation);
+ }
+}
+
+impl TagRouterInner {
pub fn parse_config(&mut self, config: TagRouterConfig) {
self.tag_rules = HashMap::new();
self.force = config.force;
@@ -43,26 +57,28 @@ impl TagRouter {
}
tag_result
}
-}
-impl Router for TagRouter {
- fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
+ pub fn route(&self, invokers: Vec<Url>, url: Url, _invocation: Arc<RpcInvocation>) -> Vec<Url> {
if !self.enabled {
return invokers;
};
let self_param = to_original_map(url);
let invocation_tag = self.match_tag(self_param);
let mut invokers_result = Vec::new();
+ let mut invokers_no_tag = Vec::new();
for invoker in &invokers {
let invoker_param = to_original_map(invoker.clone());
let invoker_tag = self.match_tag(invoker_param);
+ if invoker_tag == None {
+ invokers_no_tag.push(invoker.clone());
+ }
if invoker_tag == invocation_tag {
- invokers_result.push(invoker.clone())
+ invokers_result.push(invoker.clone());
}
}
if invokers_result.is_empty() {
if !self.force {
- return invokers;
+ return invokers_no_tag;
}
}
invokers_result
diff --git a/dubbo/src/invocation.rs b/dubbo/src/invocation.rs
index d3eb8ad..5750337 100644
--- a/dubbo/src/invocation.rs
+++ b/dubbo/src/invocation.rs
@@ -196,7 +196,7 @@ pub trait Invocation {
fn get_method_name(&self) -> String;
}
-#[derive(Default,Clone)]
+#[derive(Default, Clone)]
pub struct RpcInvocation {
target_service_unique_name: String,
method_name: String,
@@ -224,4 +224,4 @@ impl Invocation for RpcInvocation {
fn get_method_name(&self) -> String {
self.method_name.clone()
}
-}
\ No newline at end of file
+}
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 5008055..c941ae5 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -21,12 +21,11 @@ use std::{
};
use async_trait::async_trait;
-use aws_smithy_http::body::SdkBody;
use tower_service::Service;
use dubbo_base::Url;
-use crate::triple::client::replay::{ClonedBytesStream, ClonedBody};
+use crate::triple::client::replay::ClonedBody;
pub mod server_desc;
pub mod triple;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index af17445..9f9c769 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -24,7 +24,10 @@ use tower_service::Service;
use crate::{
protocol::Invoker,
- triple::{client::{builder::ClientBoxService, replay::ClonedBody}, transport::connection::Connection},
+ triple::{
+ client::{builder::ClientBoxService, replay::ClonedBody},
+ transport::connection::Connection,
+ },
utils::boxed_clone::BoxCloneService,
};
diff --git a/dubbo/src/triple/client/builder.rs b/dubbo/src/triple/client/builder.rs
index 32b26a5..a9756f7 100644
--- a/dubbo/src/triple/client/builder.rs
+++ b/dubbo/src/triple/client/builder.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use crate::{
cluster::{directory::StaticDirectory, Cluster, Directory, MockCluster, MockDirectory},
- codegen::{RegistryDirectory, RpcInvocation, TripleInvoker},
+ codegen::{RegistryDirectory, TripleInvoker},
protocol::BoxInvoker,
utils::boxed_clone::BoxCloneService,
};
@@ -101,14 +101,14 @@ impl ClientBuilder {
Self { direct, ..self }
}
- pub fn build(self) -> Option<BoxInvoker> {
+ pub fn build(self, service_name: String) -> Option<BoxInvoker> {
if self.direct {
return Some(Box::new(TripleInvoker::new(
Url::from_url(&self.host).unwrap(),
)));
}
- let cluster = MockCluster::default().join(Box::new(MockDirectory::new()));
+ let cluster = MockCluster::default().join(Box::new(MockDirectory::new(service_name)));
return Some(cluster);
}
diff --git a/dubbo/src/triple/client/mod.rs b/dubbo/src/triple/client/mod.rs
index 97be85e..013d9e3 100644
--- a/dubbo/src/triple/client/mod.rs
+++ b/dubbo/src/triple/client/mod.rs
@@ -16,6 +16,6 @@
*/
pub mod builder;
-pub mod triple;
pub mod replay;
+pub mod triple;
pub use triple::TripleClient;
diff --git a/dubbo/src/triple/client/replay.rs b/dubbo/src/triple/client/replay.rs
index 4ba8b93..195d750 100644
--- a/dubbo/src/triple/client/replay.rs
+++ b/dubbo/src/triple/client/replay.rs
@@ -253,7 +253,7 @@ impl Body for ClonedBody {
fn poll_trailers(
self: Pin<&mut Self>,
- cx: &mut Context<'_>,
+ _cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
diff --git a/dubbo/src/triple/client/triple.rs b/dubbo/src/triple/client/triple.rs
index 2227341..ace7aaf 100644
--- a/dubbo/src/triple/client/triple.rs
+++ b/dubbo/src/triple/client/triple.rs
@@ -21,12 +21,11 @@ use futures_util::{future, stream, StreamExt, TryStreamExt};
use http::HeaderValue;
-use super::builder::ClientBuilder;
-use super::replay::ClonedBody;
+use super::{builder::ClientBuilder, replay::ClonedBody};
use crate::codegen::RpcInvocation;
use crate::{
- invocation::{IntoStreamingRequest, Metadata, Request, Response},
+ invocation::{IntoStreamingRequest, Invocation, Metadata, Request, Response},
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode},
};
@@ -153,7 +152,7 @@ impl TripleClient {
.builder
.clone()
.unwrap()
- .build()
+ .build(invocation.get_target_service_unique_name())
.unwrap();
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -212,14 +211,13 @@ impl TripleClient {
)
.into_stream();
-
let body = ClonedBody::new(en);
let mut conn = self
.builder
.clone()
.unwrap()
- .build()
+ .build(invocation.get_target_service_unique_name())
.unwrap();
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -267,7 +265,7 @@ impl TripleClient {
.builder
.clone()
.unwrap()
- .build()
+ .build(invocation.get_target_service_unique_name())
.unwrap();
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
@@ -333,7 +331,7 @@ impl TripleClient {
.builder
.clone()
.unwrap()
- .build()
+ .build(invocation.get_target_service_unique_name())
.unwrap();
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index e756d5c..0701ec3 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -36,16 +36,12 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("UnaryEcho"));
- let path = http::uri::PathAndQuery::from_static(
- "/grpc.examples.echo.Echo/UnaryEcho",
- );
+ let path = http::uri::PathAndQuery::from_static("/grpc.examples.echo.Echo/UnaryEcho");
self.inner.unary(request, codec, path, invocation).await
}
/// ServerStreamingEcho is server side streaming.
@@ -53,51 +49,51 @@ pub mod echo_client {
&mut self,
request: Request<super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ServerStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ServerStreamingEcho",
);
- self.inner.server_streaming(request, codec, path, invocation).await
+ self.inner
+ .server_streaming(request, codec, path, invocation)
+ .await
}
/// ClientStreamingEcho is client side streaming.
pub async fn client_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("ClientStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/ClientStreamingEcho",
);
- self.inner.client_streaming(request, codec, path, invocation).await
+ self.inner
+ .client_streaming(request, codec, path, invocation)
+ .await
}
/// BidirectionalStreamingEcho is bidi streaming.
pub async fn bidirectional_streaming_echo(
&mut self,
request: impl IntoStreamingRequest<Message = super::EchoRequest>,
) -> Result<Response<Decoding<super::EchoResponse>>, dubbo::status::Status> {
- let codec = dubbo::codegen::ProstCodec::<
- super::EchoRequest,
- super::EchoResponse,
- >::default();
+ let codec =
+ dubbo::codegen::ProstCodec::<super::EchoRequest, super::EchoResponse>::default();
let invocation = RpcInvocation::default()
.with_service_unique_name(String::from("grpc.examples.echo.Echo"))
.with_method_name(String::from("BidirectionalStreamingEcho"));
let path = http::uri::PathAndQuery::from_static(
"/grpc.examples.echo.Echo/BidirectionalStreamingEcho",
);
- self.inner.bidi_streaming(request, codec, path, invocation).await
+ self.inner
+ .bidi_streaming(request, codec, path, invocation)
+ .await
}
}
}
@@ -114,9 +110,7 @@ pub mod echo_server {
request: Request<super::EchoRequest>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the ServerStreamingEcho method.
- type ServerStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type ServerStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// ServerStreamingEcho is server side streaming.
@@ -130,19 +124,14 @@ pub mod echo_server {
request: Request<Decoding<super::EchoRequest>>,
) -> Result<Response<super::EchoResponse>, dubbo::status::Status>;
///Server streaming response type for the BidirectionalStreamingEcho method.
- type BidirectionalStreamingEchoStream: futures_util::Stream<
- Item = Result<super::EchoResponse, dubbo::status::Status>,
- >
+ type BidirectionalStreamingEchoStream: futures_util::Stream<Item = Result<super::EchoResponse, dubbo::status::Status>>
+ Send
+ 'static;
/// BidirectionalStreamingEcho is bidi streaming.
async fn bidirectional_streaming_echo(
&self,
request: Request<Decoding<super::EchoRequest>>,
- ) -> Result<
- Response<Self::BidirectionalStreamingEchoStream>,
- dubbo::status::Status,
- >;
+ ) -> Result<Response<Self::BidirectionalStreamingEchoStream>, dubbo::status::Status>;
}
/// Echo is the echo service.
#[derive(Debug)]
@@ -172,10 +161,7 @@ pub mod echo_server {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
@@ -188,26 +174,18 @@ pub mod echo_server {
}
impl<T: Echo> UnarySvc<super::EchoRequest> for UnaryEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
+ fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
let fut = async move { inner.unary_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server.unary(UnaryEchoServer { inner }, req).await;
Ok(res)
};
@@ -218,32 +196,22 @@ pub mod echo_server {
struct ServerStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ServerStreamingSvc<super::EchoRequest>
- for ServerStreamingEchoServer<T> {
+ impl<T: Echo> ServerStreamingSvc<super::EchoRequest> for ServerStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::ServerStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
- fn call(
- &mut self,
- request: Request<super::EchoRequest>,
- ) -> Self::Future {
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
+ fn call(&mut self, request: Request<super::EchoRequest>) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.server_streaming_echo(request).await
- };
+ let fut = async move { inner.server_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.server_streaming(ServerStreamingEchoServer { inner }, req)
.await;
@@ -256,31 +224,23 @@ pub mod echo_server {
struct ClientStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> ClientStreamingSvc<super::EchoRequest>
- for ClientStreamingEchoServer<T> {
+ impl<T: Echo> ClientStreamingSvc<super::EchoRequest> for ClientStreamingEchoServer<T> {
type Response = super::EchoResponse;
- type Future = BoxFuture<
- Response<Self::Response>,
- dubbo::status::Status,
- >;
+ type Future = BoxFuture<Response<Self::Response>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.client_streaming_echo(request).await
- };
+ let fut = async move { inner.client_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
.client_streaming(ClientStreamingEchoServer { inner }, req)
.await;
@@ -293,56 +253,41 @@ pub mod echo_server {
struct BidirectionalStreamingEchoServer<T: Echo> {
inner: _Inner<T>,
}
- impl<T: Echo> StreamingSvc<super::EchoRequest>
- for BidirectionalStreamingEchoServer<T> {
+ impl<T: Echo> StreamingSvc<super::EchoRequest> for BidirectionalStreamingEchoServer<T> {
type Response = super::EchoResponse;
type ResponseStream = T::BidirectionalStreamingEchoStream;
- type Future = BoxFuture<
- Response<Self::ResponseStream>,
- dubbo::status::Status,
- >;
+ type Future =
+ BoxFuture<Response<Self::ResponseStream>, dubbo::status::Status>;
fn call(
&mut self,
request: Request<Decoding<super::EchoRequest>>,
) -> Self::Future {
let inner = self.inner.0.clone();
- let fut = async move {
- inner.bidirectional_streaming_echo(request).await
- };
+ let fut =
+ async move { inner.bidirectional_streaming_echo(request).await };
Box::pin(fut)
}
}
let fut = async move {
- let mut server = TripleServer::new(
- dubbo::codegen::ProstCodec::<
- super::EchoResponse,
- super::EchoRequest,
- >::default(),
- );
+ let mut server = TripleServer::new(dubbo::codegen::ProstCodec::<
+ super::EchoResponse,
+ super::EchoRequest,
+ >::default());
let res = server
- .bidi_streaming(
- BidirectionalStreamingEchoServer {
- inner,
- },
- req,
- )
+ .bidi_streaming(BidirectionalStreamingEchoServer { inner }, req)
.await;
Ok(res)
};
Box::pin(fut)
}
- _ => {
- Box::pin(async move {
- Ok(
- http::Response::builder()
- .status(200)
- .header("grpc-status", "12")
- .header("content-type", "application/grpc")
- .body(empty_body())
- .unwrap(),
- )
- })
- }
+ _ => Box::pin(async move {
+ Ok(http::Response::builder()
+ .status(200)
+ .header("grpc-status", "12")
+ .header("content-type", "application/grpc")
+ .body(empty_body())
+ .unwrap())
+ }),
}
}
}