You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:41:01 UTC
[dubbo-rust] 25/26: feat(protocol): Adding service router support.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 69f64979a77338f52610e4cee8f296438d61b1ce
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 24 11:06:31 2022 +0800
feat(protocol): Adding service router support.
---
dubbo-rust-protocol/Cargo.toml | 2 +-
dubbo-rust-protocol/src/jsonrpc/server.rs | 110 +++++++++++++++++++++++++-----
2 files changed, 94 insertions(+), 18 deletions(-)
diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
index 6e108ea..3505f97 100644
--- a/dubbo-rust-protocol/Cargo.toml
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -11,7 +11,7 @@ serde_json = { version = "1.0.81" }
pin-project-lite = { version = "0.2.9" }
hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] }
tokio = "1.19.2"
-tower = "0.4.12"
+tower = { version = "0.4.12", features = ["util"] }
futures = "0.3.21"
log = "0.4.17"
http = "0.2.8"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs
index ed87953..4e6e51f 100644
--- a/dubbo-rust-protocol/src/jsonrpc/server.rs
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+use std::collections::HashMap;
use std::future::Future;
use std::{net::SocketAddr, pin::Pin, task::Poll};
@@ -31,6 +32,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::Request as JsonRpcRequest;
use super::Response as JsonRpcResponse;
+use crate::NamedService;
+use tower::util::BoxCloneService;
fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
where
@@ -139,12 +142,21 @@ where
////////////////////////////////////
#[derive(Clone)]
-pub struct JsonRpcService<S> {
- service: S,
+pub struct JsonRpcService {
+ // service: HashMap<String, Box<S>>,
+ service: HashMap<String, BoxCloneService<JsonRpcRequest, JsonRpcResponse, StdError>>,
}
-impl<S> JsonRpcService<S> {
- pub fn new(service: S) -> Self
+pub struct JsonRpcServiceBuilder {
+ ident: Option<JsonRpcService>,
+}
+
+impl JsonRpcService {
+ pub fn builder() -> JsonRpcServiceBuilder {
+ JsonRpcServiceBuilder { ident: None }
+ }
+
+ pub fn new<S>(service: S) -> Self
where
S: tower::Service<
JsonRpcRequest,
@@ -152,21 +164,75 @@ impl<S> JsonRpcService<S> {
Error = StdError,
Future = SrvFut<JsonRpcResponse, StdError>,
>,
- S: Clone + Send + 'static,
+ S: Clone + Send + Sync + 'static,
+ S: NamedService,
{
- Self { service: service }
+ let mut mm = HashMap::new();
+
+ mm.insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service));
+
+ Self { service: mm }
+ }
+
+ pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError>
+ where
+ S: tower::Service<
+ JsonRpcRequest,
+ Response = JsonRpcResponse,
+ Error = StdError,
+ Future = SrvFut<JsonRpcResponse, StdError>,
+ >,
+ S: Clone + Send + Sync + 'static,
+ S: NamedService,
+ {
+ if self.service.contains_key(S::SERVICE_NAME) {
+ return Err(format!("dupplicate service name {}", S::SERVICE_NAME).into());
+ }
+
+ self.service
+ .insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service));
+ Ok(())
}
}
-impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S>
-where
- S: tower::Service<
- JsonRpcRequest,
- Response = JsonRpcResponse,
- Error = StdError,
- Future = SrvFut<JsonRpcResponse, StdError>,
- >,
- S: Clone + Send + 'static,
+impl JsonRpcServiceBuilder {
+ pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError>
+ where
+ S: tower::Service<
+ JsonRpcRequest,
+ Response = JsonRpcResponse,
+ Error = StdError,
+ Future = SrvFut<JsonRpcResponse, StdError>,
+ >,
+ S: Clone + Send + Sync + 'static,
+ S: NamedService,
+ {
+ if self.ident.is_none() {
+ self.ident.replace(JsonRpcService::new(service));
+ return Ok(());
+ }
+
+ self.ident.as_mut().unwrap().add_service(service)
+ }
+
+ pub fn build(mut self) -> Result<JsonRpcService, StdError> {
+ if self.ident.is_none() {
+ return Err("nothing build".into());
+ }
+
+ Ok(self.ident.take().unwrap())
+ }
+}
+
+impl tower::Service<HttpRequest<Body>> for JsonRpcService
+// where
+// S: tower::Service<
+// JsonRpcRequest,
+// Response = JsonRpcResponse,
+// Error = StdError,
+// Future = SrvFut<JsonRpcResponse, StdError>,
+// >,
+// S: Clone + Send + 'static,
{
type Response = HttpResponse<Body>;
@@ -179,8 +245,18 @@ where
}
fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future {
- // serde
- let mut inner_service = self.service.clone();
+ // fetch service
+ let service_name = req
+ .uri()
+ .path()
+ .trim_start_matches("/")
+ .trim_end_matches("/");
+
+ if !self.service.contains_key(service_name) {
+ return wrap_future(async { Ok(HttpResponse::builder().body(Body::empty()).unwrap()) });
+ }
+
+ let mut inner_service = self.service.get(service_name).unwrap().clone();
wrap_future(async move {
if let Some(data) = req.data().await {
if let Err(ref e) = data {