You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/06 09:40:58 UTC
[dubbo-rust] 22/26: feat(protocol) : Adding jsonrpc protocol .
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-transport
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 209757d13b4a2cf4e43b38b7555d6be3e49fe4a5
Author: even <st...@icloud.com>
AuthorDate: Fri Jun 17 15:02:25 2022 +0800
feat(protocol) : Adding jsonrpc protocol .
---
dubbo-rust-protocol/Cargo.toml | 17 +++
dubbo-rust-protocol/src/jsonrpc/client.rs | 16 +++
dubbo-rust-protocol/src/jsonrpc/mod.rs | 165 +++++++++++++++++++++++
dubbo-rust-protocol/src/jsonrpc/server.rs | 214 ++++++++++++++++++++++++++++++
dubbo-rust-protocol/src/lib.rs | 5 +
5 files changed, 417 insertions(+)
diff --git a/dubbo-rust-protocol/Cargo.toml b/dubbo-rust-protocol/Cargo.toml
new file mode 100644
index 0000000..6e108ea
--- /dev/null
+++ b/dubbo-rust-protocol/Cargo.toml
@@ -0,0 +1,17 @@
+[package]
+name = "dubbo-rust-protocol"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+serde = { version = "1.0.137", features = ["derive"] }
+serde_json = { version = "1.0.81" }
+pin-project-lite = { version = "0.2.9" }
+hyper = { version = "0.14.19", features = ["server","http1","tcp","http2","client"] }
+tokio = "1.19.2"
+tower = "0.4.12"
+futures = "0.3.21"
+log = "0.4.17"
+http = "0.2.8"
\ No newline at end of file
diff --git a/dubbo-rust-protocol/src/jsonrpc/client.rs b/dubbo-rust-protocol/src/jsonrpc/client.rs
new file mode 100644
index 0000000..2944f98
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/client.rs
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
diff --git a/dubbo-rust-protocol/src/jsonrpc/mod.rs b/dubbo-rust-protocol/src/jsonrpc/mod.rs
new file mode 100644
index 0000000..fab67d6
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/mod.rs
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+pub mod client;
+pub mod server;
+
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use serde_json::json;
+
+#[derive(Debug, Clone)]
+pub struct Header {
+ pub jsonrpc: Option<String>,
+ pub id: Option<String>,
+ pub method: Option<String>,
+}
+
+pub struct Request {
+ pub header: Header,
+ pub params: serde_json::Value,
+}
+impl Request {
+ fn from_slice(slice: Vec<u8>) -> Result<Self, serde_json::Error> {
+ let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?;
+
+ let header = Header {
+ jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?,
+ id: Self::get_json_value(pre.get("id"))?,
+ method: Self::get_json_value(pre.get("method"))?,
+ };
+
+ let params = serde_json::from_value(pre.get("params").unwrap_or(&json!({})).clone())?;
+
+ Ok(Self { header, params })
+ }
+
+ fn get_json_value<T: DeserializeOwned>(
+ value: Option<&serde_json::Value>,
+ ) -> Result<Option<T>, serde_json::Error> {
+ match value {
+ None => Ok(None),
+ Some(v) => {
+ let ret: T = serde_json::from_value(v.clone())?;
+
+ Ok(Some(ret))
+ }
+ }
+ }
+
+ pub fn new<T: Serialize>(method_name: &str, req: T) -> Result<Self, serde_json::Error> {
+ Ok(Self {
+ header: Header {
+ jsonrpc: Some("2.0".to_string()),
+ id: Some("1".to_string()),
+ method: Some(method_name.to_string()),
+ },
+ params: serde_json::to_value(&req)?,
+ })
+ }
+
+ pub fn to_string(&self) -> Result<String, serde_json::Error> {
+ Ok(serde_json::to_string(&json!({
+ "jsonrpc": self.header.jsonrpc,
+ "id": self.header.id,
+ "method": self.header.method,
+ "params": self.params
+ }))?)
+ }
+}
+
+pub struct Response {
+ header: Header,
+ error: Option<Error>,
+ result: Option<serde_json::Value>,
+}
+
+impl Response {
+ pub fn from_request<B: Serialize>(
+ request: &Request,
+ result: B,
+ ) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
+ let p = serde_json::to_value(result)?;
+ Ok(Self {
+ header: request.header.clone(),
+ error: None,
+ result: Some(p),
+ })
+ }
+
+ pub fn from_request_error(request: &Request, error: Error) -> Self {
+ Self {
+ header: request.header.clone(),
+ error: Some(error),
+ result: None,
+ }
+ }
+
+ pub fn to_string(&self) -> Result<String, serde_json::Error> {
+ let ret = json!({
+ "jsonrpc": self.header.jsonrpc,
+ "id": self.header.id,
+ "method": self.header.method,
+ "result": self.result,
+ "error": self.error
+ });
+
+ Ok(serde_json::to_string(&ret)?)
+ }
+
+ pub fn from_slice(slice: &Vec<u8>) -> Result<Self, serde_json::Error> {
+ let pre: serde_json::map::Map<String, serde_json::Value> = serde_json::from_slice(&slice)?;
+
+ let header = Header {
+ jsonrpc: Self::get_json_value(pre.get("jsonrpc"))?,
+ id: Self::get_json_value(pre.get("id"))?,
+ method: Self::get_json_value(pre.get("method"))?,
+ };
+
+ let result = serde_json::from_value(pre.get("result").unwrap_or(&json!({})).clone())?;
+ let error = serde_json::from_value(pre.get("error").unwrap_or(&json!({})).clone())?;
+ Ok(Self {
+ header,
+ result,
+ error,
+ })
+ }
+
+ pub fn get_body<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
+ Ok(serde_json::from_value(
+ self.result.as_ref().unwrap_or(&json!({})).clone(),
+ )?)
+ }
+
+ fn get_json_value<T: DeserializeOwned>(
+ value: Option<&serde_json::Value>,
+ ) -> Result<Option<T>, serde_json::Error> {
+ match value {
+ None => Ok(None),
+ Some(v) => {
+ let ret: T = serde_json::from_value(v.clone())?;
+
+ Ok(Some(ret))
+ }
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct Error {
+ pub code: i64,
+ pub message: String,
+}
diff --git a/dubbo-rust-protocol/src/jsonrpc/server.rs b/dubbo-rust-protocol/src/jsonrpc/server.rs
new file mode 100644
index 0000000..ed87953
--- /dev/null
+++ b/dubbo-rust-protocol/src/jsonrpc/server.rs
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::future::Future;
+use std::{net::SocketAddr, pin::Pin, task::Poll};
+
+use futures::ready;
+use http::{Request as HttpRequest, Response as HttpResponse};
+use hyper::body::HttpBody;
+use hyper::server::accept::Accept;
+use hyper::server::conn::{AddrIncoming, AddrStream};
+use hyper::server::conn::{Connection, Http};
+use hyper::Body;
+use log::trace;
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use super::Request as JsonRpcRequest;
+use super::Response as JsonRpcResponse;
+
+fn wrap_future<F, R, E>(fut: F) -> SrvFut<R, E>
+where
+ F: Future<Output = Result<R, E>> + Send + 'static,
+{
+ Box::pin(fut)
+}
+
+pin_project! {
+ pub struct JsonRpcServer<S> {
+ #[pin]
+ incoming: AddrIncoming,
+ rt_handle: tokio::runtime::Handle,
+ service: S
+ }
+}
+
+impl<S> JsonRpcServer<S> {
+ pub fn new(addr: &SocketAddr, handle: tokio::runtime::Handle, service: S) -> Self
+ where
+ S: tower::Service<HttpRequest<Body>> + Clone,
+ {
+ let incoming = AddrIncoming::bind(addr).unwrap();
+ Self {
+ incoming: incoming,
+ rt_handle: handle,
+ service,
+ }
+ }
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Option<Result<AddrStream, std::io::Error>>> {
+ let me = self.project();
+ me.incoming.poll_accept(cx)
+ }
+}
+
+type SrvFut<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'static>>;
+
+pin_project! {
+ struct OneConnection<IO,S>
+ where S: tower::Service<HttpRequest<Body>,Response = HttpResponse<Body>,Error = StdError, Future = SrvFut<HttpResponse<Body>,StdError>>
+ {
+ #[pin]
+ connection: Connection<IO,S>
+ }
+}
+
+impl<IO, S> Future for OneConnection<IO, S>
+where
+ S: tower::Service<
+ HttpRequest<Body>,
+ Response = HttpResponse<Body>,
+ Error = StdError,
+ Future = SrvFut<HttpResponse<Body>, StdError>,
+ > + Unpin,
+ IO: AsyncRead + AsyncWrite + Unpin,
+{
+ type Output = Result<(), hyper::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
+ self.project().connection.poll_without_shutdown(cx)
+ }
+}
+
+type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
+impl<S> Future for JsonRpcServer<S>
+where
+ S: tower::Service<
+ HttpRequest<Body>,
+ Response = HttpResponse<Body>,
+ Error = StdError,
+ Future = SrvFut<HttpResponse<Body>, StdError>,
+ >,
+ S: Clone + Send + 'static + Unpin,
+{
+ type Output = Result<(), StdError>;
+
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ loop {
+ let ret = ready!(self.as_mut().poll_next(cx));
+ match ret {
+ Some(Ok(stream)) => {
+ trace!("Get conn {}", stream.remote_addr());
+
+ let connection = Http::new()
+ .http1_only(true)
+ .http1_keep_alive(true)
+ .serve_connection(stream, self.service.clone());
+
+ let one_conn = OneConnection { connection };
+ self.rt_handle.spawn(one_conn);
+ }
+ Some(Err(e)) => return Poll::Ready(Err(e.into())),
+ None => return Poll::Ready(Err("option none".into())),
+ }
+ }
+ }
+}
+
+////////////////////////////////////
+
+#[derive(Clone)]
+pub struct JsonRpcService<S> {
+ service: S,
+}
+
+impl<S> JsonRpcService<S> {
+ pub fn new(service: S) -> Self
+ where
+ S: tower::Service<
+ JsonRpcRequest,
+ Response = JsonRpcResponse,
+ Error = StdError,
+ Future = SrvFut<JsonRpcResponse, StdError>,
+ >,
+ S: Clone + Send + 'static,
+ {
+ Self { service: service }
+ }
+}
+
+impl<S> tower::Service<HttpRequest<Body>> for JsonRpcService<S>
+where
+ S: tower::Service<
+ JsonRpcRequest,
+ Response = JsonRpcResponse,
+ Error = StdError,
+ Future = SrvFut<JsonRpcResponse, StdError>,
+ >,
+ S: Clone + Send + 'static,
+{
+ type Response = HttpResponse<Body>;
+
+ type Error = StdError;
+
+ type Future = SrvFut<Self::Response, Self::Error>;
+
+ fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, mut req: HttpRequest<Body>) -> Self::Future {
+ // serde
+ let mut inner_service = self.service.clone();
+ wrap_future(async move {
+ if let Some(data) = req.data().await {
+ if let Err(ref e) = data {
+ trace!("Get body error {}", e);
+ }
+ let data = data?;
+
+ let request = JsonRpcRequest::from_slice(data.to_vec());
+
+ if let Err(ref e) = request {
+ trace!("Serde error {}", e);
+ }
+ let request = request?;
+
+ let fut = inner_service.call(request);
+ let res = fut.await?;
+
+ let response_string = res.to_string()?;
+
+ return Ok(HttpResponse::builder()
+ .body(response_string.into())
+ .unwrap());
+ } else {
+ trace!("none");
+ }
+
+ trace!("get req {:?}", req);
+ Ok(HttpResponse::builder().body(Body::empty()).unwrap())
+ })
+ }
+}
diff --git a/dubbo-rust-protocol/src/lib.rs b/dubbo-rust-protocol/src/lib.rs
new file mode 100644
index 0000000..9565a85
--- /dev/null
+++ b/dubbo-rust-protocol/src/lib.rs
@@ -0,0 +1,5 @@
+pub mod jsonrpc;
+
+pub trait NamedService {
+ const SERVICE_NAME: &'static str;
+}