You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:40:59 UTC

[dubbo-rust] 23/26: feat(examples) : Adding jsonrpc-basic example .

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 457941d7cfa857e332d8ad6041b3df161a45d93e
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 17 15:03:20 2022 +0800

    feat(examples) : Adding jsonrpc-basic example .
---
 dubbo-rust-examples/Cargo.toml                     |  25 ++++
 .../src/jsonrpc-basic/addservice.rs                | 156 +++++++++++++++++++++
 dubbo-rust-examples/src/jsonrpc-basic/client.rs    |  37 +++++
 dubbo-rust-examples/src/jsonrpc-basic/server.rs    |  54 +++++++
 4 files changed, 272 insertions(+)

diff --git a/dubbo-rust-examples/Cargo.toml b/dubbo-rust-examples/Cargo.toml
index 5013bb9..3078752 100644
--- a/dubbo-rust-examples/Cargo.toml
+++ b/dubbo-rust-examples/Cargo.toml
@@ -6,3 +6,28 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+async-trait = { version = "0.1.56" }
+serde = { version = "1.0.137", features = ["derive"] }
+serde_json = { version = "1.0.81" }
+tower = { version = "0.4.12" }
+env_logger = "0.9.0"
+log = "0.4.17"
+tokio = { version = "1.19.2", features = ["full"] }
+hyper = { version = "0.14.19", features = ["tcp","client","http1"] }
+
+dubbo-rust-protocol = { path = "../dubbo-rust-protocol" }
+
+
+
+
+#jsonrpc-basic
+
+[[bin]]
+name = "jsonrpc-basic-client"
+path = "src/jsonrpc-basic/client.rs"
+
+
+[[bin]]
+name = "jsonrpc-basic-server"
+path = "src/jsonrpc-basic/server.rs"
+
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs
new file mode 100644
index 0000000..91f554b
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// this file should be auto generated by crate dubbo-rust-build (WIP) , now it is hand-write : )
+
+#![allow(dead_code)]
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::{future::Future, pin::Pin};
+
+pub type StdError = Box<(dyn std::error::Error + Send + Sync + 'static)>;
+pub type BoxFuture<R, E> = Pin<Box<(dyn Future<Output = Result<R, E>> + Send + 'static)>>;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct AddReq {
+    pub numbers: Vec<i32>,
+}
+
+pub type AddResp = i32;
+
+#[async_trait]
+pub trait AddService {
+    async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError>;
+}
+
+pub mod add_service {
+    use std::task::Poll;
+
+    use dubbo_rust_protocol::{
+        jsonrpc::{Request, Response},
+        NamedService,
+    };
+
+    use super::{AddReq, AddService, BoxFuture, StdError};
+
+    #[derive(Clone)]
+    pub struct AddServer<T: AddService + Clone> {
+        inner: T,
+    }
+
+    impl<T: AddService + Clone> AddServer<T> {
+        pub fn new(service: T) -> Self {
+            Self { inner: service }
+        }
+    }
+
+    impl<T: AddService + Clone> NamedService for AddServer<T> {
+        const SERVICE_NAME: &'static str = "AddService";
+    }
+
+    impl<T> tower::Service<Request> for AddServer<T>
+    where
+        T: AddService + Clone + Send + 'static,
+    {
+        type Response = Response;
+
+        type Error = StdError;
+
+        type Future = BoxFuture<Self::Response, Self::Error>;
+
+        fn poll_ready(
+            &mut self,
+            _: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Result<(), Self::Error>> {
+            Poll::Ready(Ok(()))
+        }
+
+        fn call(&mut self, req: Request) -> Self::Future {
+            let method_not_found = |req: Request| {
+                Box::pin(async move {
+                    let error = dubbo_rust_protocol::jsonrpc::Error {
+                        code: -32601,
+                        message: "Method not found".to_string(),
+                    };
+
+                    Ok(Response::from_request_error(&req, error))
+                })
+            };
+
+            if req.header.method.is_none() {
+                return method_not_found(req);
+            }
+
+            let mut inner_service = self.inner.clone();
+            match req.header.method.as_ref().unwrap().as_str() {
+                "add" => Box::pin(async move {
+                    let params: AddReq = serde_json::from_value(req.params.clone())?;
+                    let result = inner_service.add(params).await?;
+                    Ok(Response::from_request(&req, result)?)
+                }),
+                _ => method_not_found(req),
+            }
+        }
+    }
+}
+
+pub mod add_client {
+    use std::net::SocketAddr;
+
+    use super::{AddReq, AddResp, StdError};
+    use dubbo_rust_protocol::jsonrpc::Request as JsonRpcRequest;
+    use dubbo_rust_protocol::jsonrpc::Response as JsonRpcResponse;
+    use hyper::body::HttpBody;
+    use hyper::client::HttpConnector;
+
+    pub struct AddClient {
+        addr: SocketAddr,
+        http_client: hyper::Client<HttpConnector>,
+    }
+
+    impl AddClient {
+        pub fn new(addr: &SocketAddr) -> Result<Self, StdError> {
+            let client = hyper::Client::new();
+            Ok(Self {
+                addr: addr.clone(),
+                http_client: client,
+            })
+        }
+
+        pub async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> {
+            let req = JsonRpcRequest::new("add", req)?;
+            let req_str = req.to_string()?;
+            let http_request = hyper::Request::builder()
+                .method("POST")
+                .uri(format!("http://{}", self.addr.to_string()))
+                .body(hyper::Body::from(req_str))
+                .expect("request builder");
+
+            let mut resp = self.http_client.request(http_request).await?;
+
+            let body = resp.data().await;
+            if body.is_none() {
+                return Err("body empty".into());
+            }
+            let body = body.unwrap()?;
+            let jsonrpc_resp = JsonRpcResponse::from_slice(&body.to_vec())?;
+
+            Ok(jsonrpc_resp.get_body()?)
+        }
+    }
+}
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/client.rs b/dubbo-rust-examples/src/jsonrpc-basic/client.rs
new file mode 100644
index 0000000..ef402ba
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/client.rs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+mod addservice;
+
+use std::{net::SocketAddr, str::FromStr};
+
+use addservice::{add_client::AddClient, AddReq};
+
+#[tokio::main]
+async fn main() {
+    let addr = SocketAddr::from_str("127.0.0.1:40021").unwrap();
+
+    let mut client = AddClient::new(&addr).unwrap();
+
+    let req = AddReq {
+        numbers: vec![1, 2, 21],
+    };
+
+    let resp = client.add(req).await.unwrap();
+
+    println!("resp : {:?}", resp);
+}
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/server.rs b/dubbo-rust-examples/src/jsonrpc-basic/server.rs
new file mode 100644
index 0000000..79bd7ec
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/server.rs
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use dubbo_rust_protocol::jsonrpc::server::{JsonRpcServer, JsonRpcService};
+use log::info;
+use std::{net::SocketAddr, str::FromStr};
+
+mod addservice;
+
+use addservice::{add_service::AddServer, AddReq, AddResp, AddService, StdError};
+
+#[derive(Clone)]
+struct MyAdd;
+
+#[async_trait::async_trait]
+impl AddService for MyAdd {
+    async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> {
+        info!("get request {:?}", req);
+        Ok(req.numbers.iter().sum())
+    }
+}
+
+#[tokio::main]
+async fn main() {
+    // log
+    env_logger::builder()
+        .filter_level(log::LevelFilter::Info)
+        .init();
+
+    let addr = SocketAddr::from_str("0.0.0.0:40021").unwrap();
+    let rt = tokio::runtime::Handle::current();
+
+    let clone_service = JsonRpcService::new(AddServer::new(MyAdd));
+
+    let server = JsonRpcServer::new(&addr, rt, clone_service);
+
+    info!("Server start at {}", addr.to_string());
+
+    server.await.unwrap();
+}