You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:40:59 UTC
[dubbo-rust] 23/26: feat(examples) : Adding jsonrpc-basic example .
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 457941d7cfa857e332d8ad6041b3df161a45d93e
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 17 15:03:20 2022 +0800
feat(examples) : Adding jsonrpc-basic example .
---
dubbo-rust-examples/Cargo.toml | 25 ++++
.../src/jsonrpc-basic/addservice.rs | 156 +++++++++++++++++++++
dubbo-rust-examples/src/jsonrpc-basic/client.rs | 37 +++++
dubbo-rust-examples/src/jsonrpc-basic/server.rs | 54 +++++++
4 files changed, 272 insertions(+)
diff --git a/dubbo-rust-examples/Cargo.toml b/dubbo-rust-examples/Cargo.toml
index 5013bb9..3078752 100644
--- a/dubbo-rust-examples/Cargo.toml
+++ b/dubbo-rust-examples/Cargo.toml
@@ -6,3 +6,28 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+async-trait = { version = "0.1.56" }
+serde = { version = "1.0.137", features = ["derive"] }
+serde_json = { version = "1.0.81" }
+tower = { version = "0.4.12" }
+env_logger = "0.9.0"
+log = "0.4.17"
+tokio = { version = "1.19.2", features = ["full"] }
+hyper = { version = "0.14.19", features = ["tcp","client","http1"] }
+
+dubbo-rust-protocol = { path = "../dubbo-rust-protocol" }
+
+
+
+
+#jsonrpc-basic
+
+[[bin]]
+name = "jsonrpc-basic-client"
+path = "src/jsonrpc-basic/client.rs"
+
+
+[[bin]]
+name = "jsonrpc-basic-server"
+path = "src/jsonrpc-basic/server.rs"
+
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs
new file mode 100644
index 0000000..91f554b
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/addservice.rs
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// this file should be auto generated by crate dubbo-rust-build (WIP) , now it is hand-write : )
+
+#![allow(dead_code)]
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::{future::Future, pin::Pin};
+
+pub type StdError = Box<(dyn std::error::Error + Send + Sync + 'static)>;
+pub type BoxFuture<R, E> = Pin<Box<(dyn Future<Output = Result<R, E>> + Send + 'static)>>;
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct AddReq {
+ pub numbers: Vec<i32>,
+}
+
+pub type AddResp = i32;
+
+#[async_trait]
+pub trait AddService {
+ async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError>;
+}
+
+pub mod add_service {
+ use std::task::Poll;
+
+ use dubbo_rust_protocol::{
+ jsonrpc::{Request, Response},
+ NamedService,
+ };
+
+ use super::{AddReq, AddService, BoxFuture, StdError};
+
+ #[derive(Clone)]
+ pub struct AddServer<T: AddService + Clone> {
+ inner: T,
+ }
+
+ impl<T: AddService + Clone> AddServer<T> {
+ pub fn new(service: T) -> Self {
+ Self { inner: service }
+ }
+ }
+
+ impl<T: AddService + Clone> NamedService for AddServer<T> {
+ const SERVICE_NAME: &'static str = "AddService";
+ }
+
+ impl<T> tower::Service<Request> for AddServer<T>
+ where
+ T: AddService + Clone + Send + 'static,
+ {
+ type Response = Response;
+
+ type Error = StdError;
+
+ type Future = BoxFuture<Self::Response, Self::Error>;
+
+ fn poll_ready(
+ &mut self,
+ _: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request) -> Self::Future {
+ let method_not_found = |req: Request| {
+ Box::pin(async move {
+ let error = dubbo_rust_protocol::jsonrpc::Error {
+ code: -32601,
+ message: "Method not found".to_string(),
+ };
+
+ Ok(Response::from_request_error(&req, error))
+ })
+ };
+
+ if req.header.method.is_none() {
+ return method_not_found(req);
+ }
+
+ let mut inner_service = self.inner.clone();
+ match req.header.method.as_ref().unwrap().as_str() {
+ "add" => Box::pin(async move {
+ let params: AddReq = serde_json::from_value(req.params.clone())?;
+ let result = inner_service.add(params).await?;
+ Ok(Response::from_request(&req, result)?)
+ }),
+ _ => method_not_found(req),
+ }
+ }
+ }
+}
+
+pub mod add_client {
+ use std::net::SocketAddr;
+
+ use super::{AddReq, AddResp, StdError};
+ use dubbo_rust_protocol::jsonrpc::Request as JsonRpcRequest;
+ use dubbo_rust_protocol::jsonrpc::Response as JsonRpcResponse;
+ use hyper::body::HttpBody;
+ use hyper::client::HttpConnector;
+
+ pub struct AddClient {
+ addr: SocketAddr,
+ http_client: hyper::Client<HttpConnector>,
+ }
+
+ impl AddClient {
+ pub fn new(addr: &SocketAddr) -> Result<Self, StdError> {
+ let client = hyper::Client::new();
+ Ok(Self {
+ addr: addr.clone(),
+ http_client: client,
+ })
+ }
+
+ pub async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> {
+ let req = JsonRpcRequest::new("add", req)?;
+ let req_str = req.to_string()?;
+ let http_request = hyper::Request::builder()
+ .method("POST")
+ .uri(format!("http://{}", self.addr.to_string()))
+ .body(hyper::Body::from(req_str))
+ .expect("request builder");
+
+ let mut resp = self.http_client.request(http_request).await?;
+
+ let body = resp.data().await;
+ if body.is_none() {
+ return Err("body empty".into());
+ }
+ let body = body.unwrap()?;
+ let jsonrpc_resp = JsonRpcResponse::from_slice(&body.to_vec())?;
+
+ Ok(jsonrpc_resp.get_body()?)
+ }
+ }
+}
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/client.rs b/dubbo-rust-examples/src/jsonrpc-basic/client.rs
new file mode 100644
index 0000000..ef402ba
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/client.rs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+mod addservice;
+
+use std::{net::SocketAddr, str::FromStr};
+
+use addservice::{add_client::AddClient, AddReq};
+
+#[tokio::main]
+async fn main() {
+ let addr = SocketAddr::from_str("127.0.0.1:40021").unwrap();
+
+ let mut client = AddClient::new(&addr).unwrap();
+
+ let req = AddReq {
+ numbers: vec![1, 2, 21],
+ };
+
+ let resp = client.add(req).await.unwrap();
+
+ println!("resp : {:?}", resp);
+}
diff --git a/dubbo-rust-examples/src/jsonrpc-basic/server.rs b/dubbo-rust-examples/src/jsonrpc-basic/server.rs
new file mode 100644
index 0000000..79bd7ec
--- /dev/null
+++ b/dubbo-rust-examples/src/jsonrpc-basic/server.rs
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use dubbo_rust_protocol::jsonrpc::server::{JsonRpcServer, JsonRpcService};
+use log::info;
+use std::{net::SocketAddr, str::FromStr};
+
+mod addservice;
+
+use addservice::{add_service::AddServer, AddReq, AddResp, AddService, StdError};
+
+#[derive(Clone)]
+struct MyAdd;
+
+#[async_trait::async_trait]
+impl AddService for MyAdd {
+ async fn add(&mut self, req: AddReq) -> Result<AddResp, StdError> {
+ info!("get request {:?}", req);
+ Ok(req.numbers.iter().sum())
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ // log
+ env_logger::builder()
+ .filter_level(log::LevelFilter::Info)
+ .init();
+
+ let addr = SocketAddr::from_str("0.0.0.0:40021").unwrap();
+ let rt = tokio::runtime::Handle::current();
+
+ let clone_service = JsonRpcService::new(AddServer::new(MyAdd));
+
+ let server = JsonRpcServer::new(&addr, rt, clone_service);
+
+ info!("Server start at {}", addr.to_string());
+
+ server.await.unwrap();
+}