You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:40:58 UTC

[dubbo-rust] 22/26: feat(protocol) : Adding jsonrpc protocol .

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 209757d13b4a2cf4e43b38b7555d6be3e49fe4a5
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 17 15:02:25 2022 +0800

    feat(protocol) : Adding jsonrpc protocol .
---
 dubbo-rust-protocol/Cargo.toml            |  17 +++
 dubbo-rust-protocol/src/jsonrpc/client.rs |  16 +++
 dubbo-rust-protocol/src/jsonrpc/mod.rs    | 165 +++++++++++++++++++++++
 dubbo-rust-protocol/src/jsonrpc/server.rs | 214 ++++++++++++++++++++++++++++++
 dubbo-rust-protocol/src/lib.rs            |   5 +
 5 files changed, 417 insertions(+)

diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
new file mode 100644
index 0000000..6e108ea
--- /dev/null
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "dubbo-rust-protocol"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+serde = { version = "1.0.137", features = ["derive"] }
+serde_json = { version = "1.0.81" }
+pin-project-lite  = { version = "0.2.9" }
+hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] }
+tokio = "1.19.2"
+tower = "0.4.12"
+futures = "0.3.21"
+log = "0.4.17"
+http = "0.2.8"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/client.rs b/dubbo-rust-protocol/src/jsonrpc/client.rs
new file mode 100644
index 0000000..2944f98
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/client.rs
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
diff --git a/dubbo-rust-protocol/src/jsonrpc/mod.rs b/dubbo-rust-protocol/src/jsonrpc/mod.rs
new file mode 100644
index 0000000..fab67d6
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/mod.rs
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod client;
+pub mod server;
+
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use serde_json::json;
+
+#[derive(Debug, Clone)]
+pub struct Header {
+    pub jsonrpc: Option<String>,
+    pub id: Option<String>,
+    pub method: Option<String>,
+}
+
+pub struct Request {
+    pub header: Header,
+    pub params: serde_json::Value,
+}
+impl Request {
+    fn from_slice(slice: Vec<u8>) -> Result<Self, serde_json::Error> {
+        let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?;
+
+        let header = Header {
+            jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?,
+            id: Self::get_json_value(pre.get("id"))?,
+            method: Self::get_json_value(pre.get("method"))?,
+        };
+
+        let params = serde_json::from_value(pre.get("params").unwrap_or(&json!({})).clone())?;
+
+        Ok(Self { header, params })
+    }
+
+    fn get_json_value<T: DeserializeOwned>(
+        value: Option<&serde_json::Value>,
+    ) -> Result<Option<T>, serde_json::Error> {
+        match value {
+            None => Ok(None),
+            Some(v) => {
+                let ret: T = serde_json::from_value(v.clone())?;
+
+                Ok(Some(ret))
+            }
+        }
+    }
+
+    pub fn new<T: Serialize>(method_name: &str, req: T) -> Result<Self, serde_json::Error> {
+        Ok(Self {
+            header: Header {
+                jsonrpc: Some("2.0".to_string()),
+                id: Some("1".to_string()),
+                method: Some(method_name.to_string()),
+            },
+            params: serde_json::to_value(&req)?,
+        })
+    }
+
+    pub fn to_string(&self) -> Result<String, serde_json::Error> {
+        Ok(serde_json::to_string(&json!({
+            "jsonrpc": self.header.jsonrpc,
+            "id": self.header.id,
+            "method": self.header.method,
+            "params": self.params
+        }))?)
+    }
+}
+
+pub struct Response {
+    header: Header,
+    error: Option<Error>,
+    result: Option<serde_json::Value>,
+}
+
+impl Response {
+    pub fn from_request<B: Serialize>(
+        request: &Request,
+        result: B,
+    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let p = serde_json::to_value(result)?;
+        Ok(Self {
+            header: request.header.clone(),
+            error: None,
+            result: Some(p),
+        })
+    }
+
+    pub fn from_request_error(request: &Request, error: Error) -> Self {
+        Self {
+            header: request.header.clone(),
+            error: Some(error),
+            result: None,
+        }
+    }
+
+    pub fn to_string(&self) -> Result<String, serde_json::Error> {
+        let ret = json!({
+            "jsonrpc": self.header.jsonrpc,
+            "id": self.header.id,
+            "method": self.header.method,
+            "result": self.result,
+            "error": self.error
+        });
+
+        Ok(serde_json::to_string(&ret)?)
+    }
+
+    pub fn from_slice(slice: &Vec<u8>) -> Result<Self, serde_json::Error> {
+        let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?;
+
+        let header = Header {
+            jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?,
+            id: Self::get_json_value(pre.get("id"))?,
+            method: Self::get_json_value(pre.get("method"))?,
+        };
+
+        let result = serde_json::from_value(pre.get("result").unwrap_or(&json!({})).clone())?;
+        let error = serde_json::from_value(pre.get("error").unwrap_or(&json!({})).clone())?;
+        Ok(Self {
+            header,
+            result,
+            error,
+        })
+    }
+
+    pub fn get_body<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
+        Ok(serde_json::from_value(
+            self.result.as_ref().unwrap_or(&json!({})).clone(),
+        )?)
+    }
+
+    fn get_json_value<T: DeserializeOwned>(
+        value: Option<&serde_json::Value>,
+    ) -> Result<Option<T>, serde_json::Error> {
+        match value {
+            None => Ok(None),
+            Some(v) => {
+                let ret: T = serde_json::from_value(v.clone())?;
+
+                Ok(Some(ret))
+            }
+        }
+    }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct Error {
+    pub code: i64,
+    pub message: String,
+}
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs
new file mode 100644
index 0000000..ed87953
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::future::Future;
+use std::{net::SocketAddr, pin::Pin, task::Poll};
+
+use futures::ready;
+use http::{Request as HttpRequest, Response as HttpResponse};
+use hyper::body::HttpBody;
+use hyper::server::accept::Accept;
+use hyper::server::conn::{AddrIncoming, AddrStream};
+use hyper::server::conn::{Connection, Http};
+use hyper::Body;
+use log::trace;
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::Request as JsonRpcRequest;
+use super::Response as JsonRpcResponse;
+
+fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
+where
+    F: Future<Output = Result<R, E>> + Send + 'static,
+{
+    Box::pin(fut)
+}
+
+pin_project! {
+   pub struct JsonRpcServer<S> {
+        #[pin]
+        incoming: AddrIncoming,
+        rt_handle: tokio::runtime::Handle,
+        service: S
+    }
+}
+
+impl<S> JsonRpcServer<S> {
+    pub fn new(addr: &SocketAddr, handle: tokio::runtime::Handle, service: S) -> Self
+    where
+        S: tower::Service<HttpRequest<Body>> + Clone,
+    {
+        let incoming = AddrIncoming::bind(addr).unwrap();
+        Self {
+            incoming: incoming,
+            rt_handle: handle,
+            service,
+        }
+    }
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<AddrStream, std::io::Error>>> {
+        let me = self.project();
+        me.incoming.poll_accept(cx)
+    }
+}
+
+type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'static>>;
+
+pin_project! {
+    struct OneConnection<IO,S>
+    where S: tower::Service<HttpRequest<Body>,Response = HttpResponse<Body>,Error = StdError, Future = SrvFut<HttpResponse<Body>,StdError>>
+    {
+        #[pin]
+        connection: Connection<IO,S>
+    }
+}
+
+impl<IO, S> Future for OneConnection<IO, S>
+where
+    S: tower::Service<
+            HttpRequest<Body>,
+            Response = HttpResponse<Body>,
+            Error = StdError,
+            Future = SrvFut<HttpResponse<Body>, StdError>,
+        > + Unpin,
+    IO: AsyncRead + AsyncWrite + Unpin,
+{
+    type Output = Result<(), hyper::Error>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+        self.project().connection.poll_without_shutdown(cx)
+    }
+}
+
+type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+impl<S> Future for JsonRpcServer<S>
+where
+    S: tower::Service<
+        HttpRequest<Body>,
+        Response = HttpResponse<Body>,
+        Error = StdError,
+        Future = SrvFut<HttpResponse<Body>, StdError>,
+    >,
+    S: Clone + Send + 'static + Unpin,
+{
+    type Output = Result<(), StdError>;
+
+    fn poll(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
+        loop {
+            let ret = ready!(self.as_mut().poll_next(cx));
+            match ret {
+                Some(Ok(stream)) => {
+                    trace!("Get conn {}", stream.remote_addr());
+
+                    let connection = Http::new()
+                        .http1_only(true)
+                        .http1_keep_alive(true)
+                        .serve_connection(stream, self.service.clone());
+
+                    let one_conn = OneConnection { connection };
+                    self.rt_handle.spawn(one_conn);
+                }
+                Some(Err(e)) => return Poll::Ready(Err(e.into())),
+                None => return Poll::Ready(Err("option none".into())),
+            }
+        }
+    }
+}
+
+////////////////////////////////////
+
+#[derive(Clone)]
+pub struct JsonRpcService<S> {
+    service: S,
+}
+
+impl<S> JsonRpcService<S> {
+    pub fn new(service: S) -> Self
+    where
+        S: tower::Service<
+            JsonRpcRequest,
+            Response = JsonRpcResponse,
+            Error = StdError,
+            Future = SrvFut<JsonRpcResponse, StdError>,
+        >,
+        S: Clone + Send + 'static,
+    {
+        Self { service: service }
+    }
+}
+
+impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S>
+where
+    S: tower::Service<
+        JsonRpcRequest,
+        Response = JsonRpcResponse,
+        Error = StdError,
+        Future = SrvFut<JsonRpcResponse, StdError>,
+    >,
+    S: Clone + Send + 'static,
+{
+    type Response = HttpResponse<Body>;
+
+    type Error = StdError;
+
+    type Future = SrvFut<Self::Response, Self::Error>;
+
+    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future {
+        // serde
+        let mut inner_service = self.service.clone();
+        wrap_future(async move {
+            if let Some(data) = req.data().await {
+                if let Err(ref e) = data {
+                    trace!("Get body error {}", e);
+                }
+                let data = data?;
+
+                let request = JsonRpcRequest::from_slice(data.to_vec());
+
+                if let Err(ref e) = request {
+                    trace!("Serde error {}", e);
+                }
+                let request = request?;
+
+                let fut = inner_service.call(request);
+                let res = fut.await?;
+
+                let response_string = res.to_string()?;
+
+                return Ok(HttpResponse::builder()
+                    .body(response_string.into())
+                    .unwrap());
+            } else {
+                trace!("none");
+            }
+
+            trace!("get req {:?}", req);
+            Ok(HttpResponse::builder().body(Body::empty()).unwrap())
+        })
+    }
+}
diff --git a/dubbo-rust-protocol/src/lib.rs b/dubbo-rust-protocol/src/lib.rs
new file mode 100644
index 0000000..9565a85
--- /dev/null
+++ b/dubbo-rust-protocol/src/lib.rs
@@ -0,0 +1,5 @@
+pub mod jsonrpc;
+
+pub trait NamedService {
+    const SERVICE_NAME: &'static str;
+}