You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:41:01 UTC

[dubbo-rust] 25/26: feat(protocol): Adding service router support.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 69f64979a77338f52610e4cee8f296438d61b1ce
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 24 11:06:31 2022 +0800

    feat(protocol): Adding service router support.
---
 dubbo-rust-protocol/Cargo.toml            |   2 +-
 dubbo-rust-protocol/src/jsonrpc/server.rs | 110 +++++++++++++++++++++++++-----
 2 files changed, 94 insertions(+), 18 deletions(-)

diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
index 6e108ea..3505f97 100644
--- a/dubbo-rust-protocol/Cargo.toml
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -11,7 +11,7 @@ serde_json = { version = "1.0.81" }
 pin-project-lite  = { version = "0.2.9" }
 hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] }
 tokio = "1.19.2"
-tower = "0.4.12"
+tower = { version = "0.4.12", features = ["util"] }
 futures = "0.3.21"
 log = "0.4.17"
 http = "0.2.8"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs
index ed87953..4e6e51f 100644
--- a/dubbo-rust-protocol/src/jsonrpc/server.rs
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+use std::collections::HashMap;
 use std::future::Future;
 use std::{net::SocketAddr, pin::Pin, task::Poll};
 
@@ -31,6 +32,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
 
 use super::Request as JsonRpcRequest;
 use super::Response as JsonRpcResponse;
+use crate::NamedService;
+use tower::util::BoxCloneService;
 
 fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
 where
@@ -139,12 +142,21 @@ where
 ////////////////////////////////////
 
 #[derive(Clone)]
-pub struct JsonRpcService<S> {
-    service: S,
+pub struct JsonRpcService {
+    // service: HashMap<String, Box<S>>,
+    service: HashMap<String, BoxCloneService<JsonRpcRequest, JsonRpcResponse, StdError>>,
 }
 
-impl<S> JsonRpcService<S> {
-    pub fn new(service: S) -> Self
+pub struct JsonRpcServiceBuilder {
+    ident: Option<JsonRpcService>,
+}
+
+impl JsonRpcService {
+    pub fn builder() -> JsonRpcServiceBuilder {
+        JsonRpcServiceBuilder { ident: None }
+    }
+
+    pub fn new<S>(service: S) -> Self
     where
         S: tower::Service<
             JsonRpcRequest,
@@ -152,21 +164,75 @@ impl<S> JsonRpcService<S> {
             Error = StdError,
             Future = SrvFut<JsonRpcResponse, StdError>,
         >,
-        S: Clone + Send + 'static,
+        S: Clone + Send + Sync + 'static,
+        S: NamedService,
     {
-        Self { service: service }
+        let mut mm = HashMap::new();
+
+        mm.insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service));
+
+        Self { service: mm }
+    }
+
+    pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError>
+    where
+        S: tower::Service<
+            JsonRpcRequest,
+            Response = JsonRpcResponse,
+            Error = StdError,
+            Future = SrvFut<JsonRpcResponse, StdError>,
+        >,
+        S: Clone + Send + Sync + 'static,
+        S: NamedService,
+    {
+        if self.service.contains_key(S::SERVICE_NAME) {
+            return Err(format!("dupplicate service name {}", S::SERVICE_NAME).into());
+        }
+
+        self.service
+            .insert(S::SERVICE_NAME.to_string(), BoxCloneService::new(service));
+        Ok(())
     }
 }
 
-impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S>
-where
-    S: tower::Service<
-        JsonRpcRequest,
-        Response = JsonRpcResponse,
-        Error = StdError,
-        Future = SrvFut<JsonRpcResponse, StdError>,
-    >,
-    S: Clone + Send + 'static,
+impl JsonRpcServiceBuilder {
+    pub fn add_service<S>(&mut self, service: S) -> Result<(), StdError>
+    where
+        S: tower::Service<
+            JsonRpcRequest,
+            Response = JsonRpcResponse,
+            Error = StdError,
+            Future = SrvFut<JsonRpcResponse, StdError>,
+        >,
+        S: Clone + Send + Sync + 'static,
+        S: NamedService,
+    {
+        if self.ident.is_none() {
+            self.ident.replace(JsonRpcService::new(service));
+            return Ok(());
+        }
+
+        self.ident.as_mut().unwrap().add_service(service)
+    }
+
+    pub fn build(mut self) -> Result<JsonRpcService, StdError> {
+        if self.ident.is_none() {
+            return Err("nothing build".into());
+        }
+
+        Ok(self.ident.take().unwrap())
+    }
+}
+
+impl tower::Service<HttpRequest<Body>> for JsonRpcService
+// where
+// S: tower::Service<
+//     JsonRpcRequest,
+//     Response = JsonRpcResponse,
+//     Error = StdError,
+//     Future = SrvFut<JsonRpcResponse, StdError>,
+// >,
+// S: Clone + Send + 'static,
 {
     type Response = HttpResponse<Body>;
 
@@ -179,8 +245,18 @@ where
     }
 
     fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future {
-        // serde
-        let mut inner_service = self.service.clone();
+        // fetch service
+        let service_name = req
+            .uri()
+            .path()
+            .trim_start_matches("/")
+            .trim_end_matches("/");
+
+        if !self.service.contains_key(service_name) {
+            return wrap_future(async { Ok(HttpResponse::builder().body(Body::empty()).unwrap()) });
+        }
+
+        let mut inner_service = self.service.get(service_name).unwrap().clone();
         wrap_future(async move {
             if let Some(data) = req.data().await {
                 if let Err(ref e) = data {