You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/23 14:44:46 UTC
[dubbo-rust] 23/35: feat: gen rpc code form proto service defined
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-idl
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit a44357d0aad89f04bc9937c190d954a6e070d8a4
Author: johankoi <jo...@163.com>
AuthorDate: Thu Jul 14 04:25:27 2022 +0800
feat: gen rpc code form proto service defined
Signed-off-by: johankoi <jo...@163.com>
---
Cargo.toml | 6 +-
dubbo-build/Cargo.toml | 12 ++
dubbo-build/src/generator.rs | 136 +++++++++++++++++++++
dubbo-build/src/lib.rs | 8 ++
examples/grpc-gen/Cargo.toml | 20 +++
examples/grpc-gen/build.rs | 12 ++
.../pb_message => grpc-gen}/pb/greeter.proto | 0
examples/grpc-gen/src/greeter.rs | 73 +++++++++++
examples/grpc-gen/src/main.rs | 12 ++
examples/protobuf-transport/Cargo.toml | 30 +++++
examples/protobuf-transport/build.rs | 9 ++
.../pb/person.proto | 0
examples/protobuf-transport/src/client.rs | 33 +++++
examples/protobuf-transport/src/lib.rs | 7 ++
.../src/pb => protobuf-transport/src}/person.rs | 0
.../main.rs => protobuf-transport/src/server.rs} | 2 +-
examples/protobuf/client/Cargo.toml | 14 ---
examples/protobuf/client/src/main.rs | 32 -----
examples/protobuf/pb_message/Cargo.toml | 13 --
examples/protobuf/pb_message/build.rs | 18 ---
examples/protobuf/pb_message/src/lib.rs | 37 ------
examples/protobuf/pb_message/src/pb/greeter.rs | 10 --
examples/protobuf/pb_message/src/pb/mod.rs | 5 -
examples/protobuf/server/Cargo.toml | 10 --
xds/Cargo.toml | 3 +
xds/src/error.rs | 136 +++++++++++++++++++++
xds/src/lib.rs | 17 +++
xds/src/request.rs | 131 ++++++++++++++++++++
xds/src/response.rs | 131 ++++++++++++++++++++
xds/src/util.rs | 9 ++
xds/src/wrapper.rs | 110 +++++++++++++++++
31 files changed, 893 insertions(+), 143 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 0fe1fd2..3016158 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,12 +1,12 @@
[workspace]
members = [
"xds",
+ "dubbo-build",
"protocol",
"registry",
"metadata",
"common",
"config",
- "examples/protobuf/client",
- "examples/protobuf/server",
- "examples/protobuf/pb_message",
+ "examples/grpc-gen",
+ "examples/protobuf-transport",
]
\ No newline at end of file
diff --git a/dubbo-build/Cargo.toml b/dubbo-build/Cargo.toml
new file mode 100644
index 0000000..5839555
--- /dev/null
+++ b/dubbo-build/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "dubbo-build"
+version = "0.1.0"
+edition = "2021"
+
+[features]
+generator-code = ["prost-build"]
+
+
+
+[dependencies]
+prost-build = { version = "0.10.4", optional = true }
\ No newline at end of file
diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs
new file mode 100644
index 0000000..05e25cf
--- /dev/null
+++ b/dubbo-build/src/generator.rs
@@ -0,0 +1,136 @@
+use prost_build::{Method, Service, ServiceGenerator};
+
+#[derive(Default)]
+pub struct CodeGenerator {}
+
+impl CodeGenerator {
+ pub fn new() -> CodeGenerator { Default::default() }
+
+ fn generate_module_name(&self) -> &str { "xds" }
+
+ fn generate_type_aliases(&mut self, buf: &mut String) {
+ buf.push_str(&format!(
+ "\n\
+ pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\
+ pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
+ self.generate_module_name()));
+ }
+
+ fn generate_main_trait(&self, service: &Service, buf: &mut String) {
+ buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n");
+
+ service.comments.append_with_indent(0, buf);
+ buf.push_str(&format!("pub trait {} {{", service.name));
+ for method in service.methods.iter() {
+ buf.push_str("\n");
+ method.comments.append_with_indent(1, buf);
+ buf.push_str(&format!(" {};\n", self.method_sig(method)));
+ }
+ buf.push_str("}\n");
+ }
+
+ fn method_sig(&self, method: &Method) -> String {
+ format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>",
+ method.name, method.input_type, method.output_type)
+ }
+
+ fn generate_client_struct(&self, service: &Service, buf: &mut String) {
+ buf.push_str(&format!(
+ "\n\
+ pub struct {0}Client {{\n \
+ pub hyper_client: {1}::wrapper::HyperClient \n\
+ }}\n",service.name, self.generate_module_name()));
+ }
+
+ fn generate_client_impl(&self, service: &Service, buf: &mut String) {
+ buf.push_str(&format!(
+ "\n\
+ impl {0}Client {{\n \
+ pub fn new(root_url: &str) -> {0}Client {{\n \
+ {0}Client {{\n \
+ hyper_client: {1}::wrapper::HyperClient::new(root_url) \n \
+ }}\n \
+ }}\n\
+ }}\n", service.name, self.generate_module_name()));
+
+ buf.push_str("\n#[async_trait]");
+ buf.push_str(&format!("\nimpl {0} for {0}Client {{", service.name));
+
+ for method in service.methods.iter() {
+ buf.push_str(&format!(
+ "\n {} {{\n \
+ self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n \
+ }}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name));
+ }
+ buf.push_str("}\n");
+ }
+
+ fn generate_server_struct(&self, service: &Service, buf: &mut String) {
+ buf.push_str(&format!(
+ "\n\
+ pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n \
+ pub hyper_server: {1}::wrapper::HyperServer<T> \n\
+ }}\n",service.name, self.generate_module_name()));
+ }
+
+ fn generate_server_impl(&self, service: &Service, buf: &mut String) {
+ buf.push_str(&format!(
+ "\n\
+ impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n \
+ pub fn new(&self, service: T) -> {0}Server<T> {{\n \
+ {0}Server {{\n \
+ hyper_server: {1}::wrapper::HyperServer::new(service) \n \
+ }}\n \
+ }}\n\
+ }}\n",
+ service.name, self.generate_module_name()));
+
+ buf.push_str(&format!(
+ "\n\
+ impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n \
+ fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n \
+ use ::futures::Future;\n \
+ let trait_object_service = self.hyper_server.service.clone();\n \
+ match (req.method.clone(), req.uri.path()) {{",
+ service.name, self.generate_module_name()));
+
+ // Make match arms for each type
+ for method in service.methods.iter() {
+ buf.push_str(&format!(
+ "\n \
+ (::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n \
+ Box::pin(async move {{ \n \
+ let proto_req = req.to_proto().unwrap(); \n \
+ let resp = trait_object_service.{3}(proto_req).await.unwrap(); \n \
+ let proto_resp = resp.to_proto_raw(); \n \
+ proto_resp \n \
+ }}) \n \
+ }},", service.package, service.proto_name, method.proto_name, method.name));
+ }
+ // Final 404 arm and end fn
+ buf.push_str(&format!(
+ "\n \
+ _ => {{\n \
+ Box::pin(async move {{ \n \
+ Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n \
+ }}) \n \
+ }}\n \
+ }}\n \
+ }}\n\
+ }}", self.generate_module_name()));
+ }
+}
+
+impl ServiceGenerator for CodeGenerator {
+ fn generate(&mut self, service: Service, buf: &mut String) {
+ self.generate_type_aliases(buf);
+ self.generate_main_trait(&service, buf);
+ self.generate_client_struct(&service, buf);
+ self.generate_client_impl(&service, buf);
+ self.generate_server_struct(&service, buf);
+ self.generate_server_impl(&service, buf);
+ }
+
+ fn finalize(&mut self, buf: &mut String) {
+ }
+}
diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs
new file mode 100644
index 0000000..329eb39
--- /dev/null
+++ b/dubbo-build/src/lib.rs
@@ -0,0 +1,8 @@
+
+
+#[cfg(feature = "generator-code")]
+extern crate prost_build;
+#[cfg(feature = "generator-code")]
+mod generator;
+#[cfg(feature = "generator-code")]
+pub use generator::CodeGenerator;
diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml
new file mode 100644
index 0000000..335c81c
--- /dev/null
+++ b/examples/grpc-gen/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "grpc-gen"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+futures = "0.3.21"
+prost = "0.10.4"
+async-trait = { version = "0.1.56" }
+xds = { version = "0.1.0", path = "../../xds" }
+
+hyper = { version = "0.14.19", features = ["full"] }
+
+
+
+[build-dependencies]
+prost-build = "0.10.4"
+dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] }
\ No newline at end of file
diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs
new file mode 100644
index 0000000..0098c18
--- /dev/null
+++ b/examples/grpc-gen/build.rs
@@ -0,0 +1,12 @@
+use std::io::Result;
+use prost_build::Config;
+
+fn main() -> Result<()> {
+ let mut conf = Config::new();
+ let mut gen = dubbo_build::CodeGenerator::new();
+ conf.service_generator(Box::new(gen));
+ conf.out_dir("src/");
+ conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap();
+
+ Ok(())
+}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/pb/greeter.proto b/examples/grpc-gen/pb/greeter.proto
similarity index 100%
rename from examples/protobuf/pb_message/pb/greeter.proto
rename to examples/grpc-gen/pb/greeter.proto
diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs
new file mode 100644
index 0000000..3ecb8f2
--- /dev/null
+++ b/examples/grpc-gen/src/greeter.rs
@@ -0,0 +1,73 @@
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloRequest {
+ #[prost(string, tag="1")]
+ pub name: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloResponse {
+ #[prost(string, tag="1")]
+ pub message: ::prost::alloc::string::String,
+}
+
+pub type DBReq<I> = xds::request::ServiceRequest<I>;
+pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>;
+
+use async_trait::async_trait;
+
+#[async_trait]
+pub trait Greeter {
+ async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>;
+}
+
+pub struct GreeterClient {
+ pub hyper_client: xds::wrapper::HyperClient
+}
+
+impl GreeterClient {
+ pub fn new(root_url: &str) -> GreeterClient {
+ GreeterClient {
+ hyper_client: xds::wrapper::HyperClient::new(root_url)
+ }
+ }
+}
+
+#[async_trait]
+impl Greeter for GreeterClient {
+ async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> {
+ self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await
+ }
+}
+
+pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> {
+ pub hyper_server: xds::wrapper::HyperServer<T>
+}
+
+impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> {
+ pub fn new(&self, service: T) -> GreeterServer<T> {
+ GreeterServer {
+ hyper_server: xds::wrapper::HyperServer::new(service)
+ }
+ }
+}
+
+impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> {
+ fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> {
+ use ::futures::Future;
+ let trait_object_service = self.hyper_server.service.clone();
+ match (req.method.clone(), req.uri.path()) {
+ (::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => {
+ Box::pin(async move {
+ let proto_req = req.to_proto().unwrap();
+ let resp = trait_object_service.say_hello(proto_req).await.unwrap();
+ let proto_resp = resp.to_proto_raw();
+ proto_resp
+ })
+ },
+ _ => {
+ Box::pin(async move {
+ Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw())
+ })
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs
new file mode 100644
index 0000000..68b341a
--- /dev/null
+++ b/examples/grpc-gen/src/main.rs
@@ -0,0 +1,12 @@
+
+mod greeter;
+use greeter::*;
+
+
+fn main() {
+
+ let client = GreeterClient::new("0.0.0.0:8080");
+ let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()});
+ client.say_hello(hello_req);
+
+}
diff --git a/examples/protobuf-transport/Cargo.toml b/examples/protobuf-transport/Cargo.toml
new file mode 100644
index 0000000..8a35f90
--- /dev/null
+++ b/examples/protobuf-transport/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "protobuf-transport"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+xds = { version = "0.1.0", path = "../../xds" }
+tokio = {version = "1.9.0", features = ["full"]}
+hyper = { version = "0.14", features = ["full"] }
+futures = "0.3.21"
+prost = "0.10.4"
+
+
+[lib]
+path = "./src/lib.rs"
+
+[[bin]]
+name="server"
+path="./src/server.rs"
+
+[[bin]]
+name="client"
+path="./src/client.rs"
+
+
+[build-dependencies]
+prost-build = "0.10.4"
+dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] }
\ No newline at end of file
diff --git a/examples/protobuf-transport/build.rs b/examples/protobuf-transport/build.rs
new file mode 100644
index 0000000..6459de7
--- /dev/null
+++ b/examples/protobuf-transport/build.rs
@@ -0,0 +1,9 @@
+use std::io::Result;
+use prost_build::Config;
+
+fn main() -> Result<()> {
+ Config::new()
+ .out_dir("src/")
+ .compile_protos(&["pb/person.proto"], &["pb/"])?;
+ Ok(())
+}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/pb/person.proto b/examples/protobuf-transport/pb/person.proto
similarity index 100%
rename from examples/protobuf/pb_message/pb/person.proto
rename to examples/protobuf-transport/pb/person.proto
diff --git a/examples/protobuf-transport/src/client.rs b/examples/protobuf-transport/src/client.rs
new file mode 100644
index 0000000..4d598bb
--- /dev/null
+++ b/examples/protobuf-transport/src/client.rs
@@ -0,0 +1,33 @@
+use std::collections::hash_map::HashMap;
+use xds::{client::RpcClient};
+use prost::Message;
+use hyper::body::Buf;
+use hyper::{Request, Response};
+
+pub mod person;
+use person::Person;
+
+
+#[tokio::main]
+async fn main() {
+
+ let mut client = RpcClient::new(String::from("http://127.0.0.1:8972"));
+
+ let service_path = String::from("helloworld");
+ let service_method = String::from("hello");
+ let metadata = HashMap::new();
+
+ let mut person = Person::default();
+ person.name = "guomiwu".to_string();
+ let pbData = person.encode_to_vec();
+
+ let callResult = client.call(service_path, service_method, &metadata, pbData).await;
+ let resp = callResult.unwrap();
+
+ // asynchronously aggregate the chunks of the body
+ let body = hyper::body::aggregate(resp).await;
+ let data = body.unwrap().chunk().to_vec();
+
+ let resPerson = Person::decode(data.as_ref()).unwrap();
+ println!("resPerson={:?}", resPerson.name);
+}
\ No newline at end of file
diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs
new file mode 100644
index 0000000..959e687
--- /dev/null
+++ b/examples/protobuf-transport/src/lib.rs
@@ -0,0 +1,7 @@
+
+pub mod person;
+pub use person::*;
+
+fn main() {
+ println!("Hello, world!");
+}
diff --git a/examples/protobuf/pb_message/src/pb/person.rs b/examples/protobuf-transport/src/person.rs
similarity index 100%
rename from examples/protobuf/pb_message/src/pb/person.rs
rename to examples/protobuf-transport/src/person.rs
diff --git a/examples/protobuf/server/src/main.rs b/examples/protobuf-transport/src/server.rs
similarity index 82%
rename from examples/protobuf/server/src/main.rs
rename to examples/protobuf-transport/src/server.rs
index a182144..9b0c443 100644
--- a/examples/protobuf/server/src/main.rs
+++ b/examples/protobuf-transport/src/server.rs
@@ -5,7 +5,7 @@ use xds::{ server::RpcServer };
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
- let mut server = RpcServer::new(addr);
+ let server = RpcServer::new(addr);
server.start().await;
println!("RpcServer ok");
}
\ No newline at end of file
diff --git a/examples/protobuf/client/Cargo.toml b/examples/protobuf/client/Cargo.toml
deleted file mode 100644
index e4c426c..0000000
--- a/examples/protobuf/client/Cargo.toml
+++ /dev/null
@@ -1,14 +0,0 @@
-[package]
-name = "protobuf"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-xds = { version = "0.1.0", path = "../../../xds" }
-tokio = {version = "1.9.0", features = ["full"]}
-pb_message = { version = "0.1.0", path = "../pb_message" }
-prost = "0.10.4"
-hyper = { version = "0.14", features = ["full"] }
-
diff --git a/examples/protobuf/client/src/main.rs b/examples/protobuf/client/src/main.rs
deleted file mode 100644
index bd6507f..0000000
--- a/examples/protobuf/client/src/main.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-use std::collections::hash_map::HashMap;
-use xds::{ client::RpcClient };
-use pb_message::pb::person::Person;
-use prost::Message;
-use hyper::body::Buf;
-
-#[tokio::main]
-async fn main() {
- let task = tokio::spawn(async move {
- let mut client = RpcClient::new(String::from("http://127.0.0.1:8972"));
-
- let service_path = String::from("helloworld");
- let service_method = String::from("hello");
- let metadata = HashMap::new();
-
- let mut person = Person::default();
- person.name = "guomiwu".to_string();
- let pbData = person.encode_to_vec();
-
- let callResult = client.call(service_path, service_method, &metadata, pbData).await;
- let resp = callResult.unwrap();
-
- // asynchronously aggregate the chunks of the body
- let body = hyper::body::aggregate(resp).await;
- let data = body.unwrap().chunk().to_vec();
-
- let resPerson = Person::decode(data.as_ref()).unwrap();
- println!("resPerson={:?}", resPerson.name);
-
- });
- task.await.unwrap();
-}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/Cargo.toml b/examples/protobuf/pb_message/Cargo.toml
deleted file mode 100644
index f4a97cd..0000000
--- a/examples/protobuf/pb_message/Cargo.toml
+++ /dev/null
@@ -1,13 +0,0 @@
-[package]
-name = "pb_message"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-prost = "0.10.4"
-prost-types = "0.10.1"
-
-[build-dependencies]
-prost-build = "0.10.4"
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/build.rs b/examples/protobuf/pb_message/build.rs
deleted file mode 100644
index a198077..0000000
--- a/examples/protobuf/pb_message/build.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-use std::io::Result;
-use prost_build::Config;
-
-fn main() -> Result<()> {
-
- Config::new()
- .out_dir("src/pb")
- .compile_protos(&["pb/person.proto"], &["pb/"])?;
-
- // println!("cargo:rerun-if-changed=pb/greeter_rpc.pb");
- // println!("cargo:rerun-if-changed=pb/build.rs");
-
- Config::new()
- .out_dir("src/pb")
- .compile_protos(&["pb/greeter.proto"], &["pb/"])?;
-
- Ok(())
-}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/src/lib.rs b/examples/protobuf/pb_message/src/lib.rs
deleted file mode 100644
index 3a90aed..0000000
--- a/examples/protobuf/pb_message/src/lib.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-
-pub mod pb;
-
-#[cfg(test)]
-mod tests {
- use crate::pb::Person;
- use crate::pb::greeter;
-
- use std::io::Cursor;
- use prost::Message;
-
- pub fn create_hello_request(name: String) -> greeter::HelloRequest {
- let mut hello_request = greeter::HelloRequest::default();
- hello_request.name = name;
- hello_request
- }
-
- pub fn serialize_greeter(hello: &greeter::HelloRequest) -> Vec<u8> {
- let mut buf = Vec::new();
- buf.reserve(hello.encoded_len());
-
- hello.encode(&mut buf).unwrap();
- buf
- }
-
-
- // pub fn deserialize_greeter(buf: &[u8]) -> Result<greeter::HelloRequest, prost::DecodeError> {
- // greeter::HelloRequest::decode(&mut Cursor::new(buf))
- // }
-
- #[test]
- fn it_works() {
- let person = Person::default();
- person.encode_to_vec();
- println!("{person:?}");
- }
-}
diff --git a/examples/protobuf/pb_message/src/pb/greeter.rs b/examples/protobuf/pb_message/src/pb/greeter.rs
deleted file mode 100644
index 36aa34d..0000000
--- a/examples/protobuf/pb_message/src/pb/greeter.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct HelloRequest {
- #[prost(string, tag="1")]
- pub name: ::prost::alloc::string::String,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct HelloResponse {
- #[prost(string, tag="1")]
- pub message: ::prost::alloc::string::String,
-}
diff --git a/examples/protobuf/pb_message/src/pb/mod.rs b/examples/protobuf/pb_message/src/pb/mod.rs
deleted file mode 100644
index 8c11e43..0000000
--- a/examples/protobuf/pb_message/src/pb/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-pub mod person;
-pub mod greeter;
-
-pub use person::*;
-pub use greeter::*;
\ No newline at end of file
diff --git a/examples/protobuf/server/Cargo.toml b/examples/protobuf/server/Cargo.toml
deleted file mode 100644
index fe077e4..0000000
--- a/examples/protobuf/server/Cargo.toml
+++ /dev/null
@@ -1,10 +0,0 @@
-[package]
-name = "server"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-xds = { version = "0.1.0", path = "../../../xds" }
-tokio = {version = "1.9.0", features = ["full"]}
\ No newline at end of file
diff --git a/xds/Cargo.toml b/xds/Cargo.toml
index 49af168..f224f7c 100644
--- a/xds/Cargo.toml
+++ b/xds/Cargo.toml
@@ -25,3 +25,6 @@ flate2 = "1.0"
tower = "0.4"
log = "0.4"
env_logger = "0.9.0"
+
+
+prost = "0.10.4"
\ No newline at end of file
diff --git a/xds/src/error.rs b/xds/src/error.rs
new file mode 100644
index 0000000..e212835
--- /dev/null
+++ b/xds/src/error.rs
@@ -0,0 +1,136 @@
+use serde_json;
+use prost::{DecodeError, EncodeError};
+use hyper::{Body, Response, Method, Version, header, StatusCode};
+use crate::response::ServiceResponse;
+
+
+/// A JSON-serializable DBError error
+#[derive(Debug)]
+pub struct DBError {
+ pub status: StatusCode,
+ pub error_type: String,
+ pub msg: String,
+ pub meta: Option<serde_json::Value>,
+}
+
+impl DBError {
+ /// Create a DBError error with no meta
+ pub fn new(status: StatusCode, error_type: &str, msg: &str) -> DBError {
+ DBError::new_meta(status, error_type, msg, None)
+ }
+
+ /// Create a DBError error with optional meta
+ pub fn new_meta(status: StatusCode, error_type: &str, msg: &str, meta: Option<serde_json::Value>) -> DBError {
+ DBError { status, error_type: error_type.to_string(), msg: msg.to_string(), meta }
+ }
+
+ /// Create a byte-array service response for this error and the given status code
+ pub fn to_resp_raw(&self) -> ServiceResponse<Vec<u8>> {
+ let output = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec());
+ let mut headers = header::HeaderMap::new();
+ headers.insert(header::CONTENT_TYPE, "json".parse().unwrap());
+ let lenth_value = header::HeaderValue::from(output.len() as u64);
+ headers.insert(header::CONTENT_LENGTH, lenth_value);
+
+ ServiceResponse {
+ version: Version::default(),
+ headers,
+ status: self.status,
+ output,
+ }
+ }
+
+ /// Create a hyper response for this error and the given status code
+ pub fn to_hyper_resp(&self) -> Response<Body> {
+ let body = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec());
+
+ let mut resp = Response::builder()
+ .status(self.status)
+ .body(Body::from(body.clone())).unwrap();
+
+ let header_mut = resp.headers_mut();
+ header_mut.insert(header::CONTENT_TYPE, "json".parse().unwrap());
+ let lenth_value = header::HeaderValue::from(body.len() as u64);
+ header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+ resp
+ }
+
+ /// Create error from Serde JSON value
+ pub fn from_json(status: StatusCode, json: serde_json::Value) -> DBError {
+ let error_type = json["error_type"].as_str();
+ DBError {
+ status,
+ error_type: error_type.unwrap_or("<no code>").to_string(),
+ msg: json["msg"].as_str().unwrap_or("<no message>").to_string(),
+ // Put the whole thing as meta if there was no type
+ meta: if error_type.is_some() { json.get("meta").map(|v| v.clone()) } else { Some(json.clone()) },
+ }
+ }
+
+ /// Create error from byte array
+ pub fn from_json_bytes(status: StatusCode, json: &[u8]) -> serde_json::Result<DBError> {
+ serde_json::from_slice(json).map(|v| DBError::from_json(status, v))
+ }
+
+ /// Create Serde JSON value from error
+ pub fn to_json(&self) -> serde_json::Value {
+ let mut props = serde_json::map::Map::new();
+ props.insert("error_type".to_string(), serde_json::Value::String(self.error_type.clone()));
+ props.insert("msg".to_string(), serde_json::Value::String(self.msg.clone()));
+ if let Some(ref meta) = self.meta { props.insert("meta".to_string(), meta.clone()); }
+ serde_json::Value::Object(props)
+ }
+
+ /// Create byte array from error
+ pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
+ serde_json::to_vec(&self.to_json())
+ }
+}
+
+impl From<DBError> for DBProstError {
+ fn from(v: DBError) -> DBProstError { DBProstError::DBWrapError(v) }
+}
+
+
+
+
+
+/// An error that can occur during a call to a service
+#[derive(Debug)]
+pub enum DBProstError {
+ /// A standard DBError error with a type, message, and some metadata
+ DBWrapError(DBError),
+ /// An error when trying to decode JSON into an error or object
+ JsonDecodeError(serde_json::Error),
+ /// An error when trying to encode a protobuf object
+ ProstEncodeError(EncodeError),
+ /// An error when trying to decode a protobuf object
+ ProstDecodeError(DecodeError),
+ /// A generic hyper error
+ HyperError(hyper::Error),
+ /// A wrapper for any of the other `DBProstError`s that also includes request/response info
+ AfterBodyError {
+ /// The request or response's raw body before the error happened
+ body: Vec<u8>,
+ /// The request method, only present for server errors
+ method: Option<Method>,
+ /// The request or response's HTTP version
+ version: Version,
+ /// The request or response's headers
+ headers: header::HeaderMap,
+ /// The response status, only present for client errors
+ status: Option<StatusCode>,
+ /// The underlying error
+ err: Box<DBProstError>,
+ },
+}
+
+impl DBProstError {
+ /// This same error, or the underlying error if it is an `AfterBodyError`
+ pub fn root_err(self) -> DBProstError {
+ match self {
+ DBProstError::AfterBodyError { err, .. } => err.root_err(),
+ _ => self
+ }
+ }
+}
\ No newline at end of file
diff --git a/xds/src/lib.rs b/xds/src/lib.rs
index 4f818f9..d3a7e94 100644
--- a/xds/src/lib.rs
+++ b/xds/src/lib.rs
@@ -19,6 +19,23 @@ pub mod client;
pub mod server;
pub mod protocol;
+
+pub mod request;
+pub mod response;
+pub mod wrapper;
+pub mod error;
+pub mod util;
+
+
+
+pub use wrapper::*;
+pub use request::*;
+pub use response::*;
+pub use error::*;
+pub use util::*;
+
+
+
#[cfg(test)]
mod tests {
use crate::client::client::RpcClient;
diff --git a/xds/src/request.rs b/xds/src/request.rs
new file mode 100644
index 0000000..4d497c0
--- /dev/null
+++ b/xds/src/request.rs
@@ -0,0 +1,131 @@
+use hyper::{body, Body, Request, Uri, Method, Version, header};
+use prost::{Message};
+use crate::util::*;
+use crate::error::*;
+
+/// A request with HTTP info and the serialized input object
+#[derive(Debug)]
+pub struct ServiceRequest<T> {
+ /// The URI of the original request
+ ///
+ /// When using a client, this will be overridden with the proper URI. It is only valuable for servers.
+ pub uri: Uri,
+ /// The request method; should always be Post
+ pub method: Method,
+ /// The HTTP version, rarely changed from the default
+ pub version: Version,
+ /// The set of headers
+ ///
+ /// Should always at least have `Content-Type`. Clients will override `Content-Length` on serialization.
+ pub headers: header::HeaderMap,
+ // The serialized request object
+ pub input: T,
+}
+
+impl<T> ServiceRequest<T> {
+ /// Create new service request with the given input object
+ ///
+ /// This automatically sets the `Content-Type` header as `application/protobuf`.
+ pub fn new(input: T) -> ServiceRequest<T> {
+ let mut header = header::HeaderMap::new();
+ header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
+ ServiceRequest {
+ uri: Default::default(),
+ method: Method::POST,
+ version: Version::default(),
+ headers: header,
+ input,
+ }
+ }
+
+ /// Copy this request with a different input value
+ pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> {
+ ServiceRequest {
+ uri: self.uri.clone(),
+ method: self.method.clone(),
+ version: self.version,
+ headers: self.headers.clone(),
+ input,
+ }
+ }
+}
+
+impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> {
+ fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) }
+}
+
+impl ServiceRequest<Vec<u8>> {
+ /// Turn a hyper request to a boxed future of a byte-array service request
+ pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> {
+ Box::pin(async move {
+ let uri = req.uri().clone();
+ let method = req.method().clone();
+ let version = req.version();
+ let headers = req.headers().clone();
+ let future_req = body::to_bytes(req.into_body()).await
+ .map_err(DBProstError::HyperError)
+ .map(move |body|
+ ServiceRequest { uri, method, version, headers, input: body.to_vec() }
+ );
+ future_req
+ })
+ }
+
+ /// Turn a byte-array service request into a hyper request
+ pub fn to_hyper_raw(&self) -> Request<Body> {
+ let mut request = Request::builder()
+ .method(self.method.clone())
+ .uri(self.uri.clone())
+ .body(Body::from(self.input.clone())).unwrap();
+
+ let header_mut = request.headers_mut();
+ header_mut.clone_from(&self.headers);
+ let lenth_value = header::HeaderValue::from(self.input.len() as u64);
+ header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+ request
+ }
+
+ /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error
+ pub fn body_err(&self, err: DBProstError) -> DBProstError {
+ DBProstError::AfterBodyError {
+ body: self.input.clone(),
+ method: Some(self.method.clone()),
+ version: self.version,
+ headers: self.headers.clone(),
+ status: None,
+ err: Box::new(err),
+ }
+ }
+
+ /// Serialize the byte-array service request into a protobuf service request
+ pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
+ match T::decode(self.input.as_ref()) {
+ Ok(v) => Ok(self.clone_with_input(v)),
+ Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
+ }
+ }
+}
+
+impl<T: Message + Default + 'static> ServiceRequest<T> {
+ /// Turn a protobuf service request into a byte-array service request
+ pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+ let mut body = Vec::new();
+ if let Err(err) = self.input.encode(&mut body) {
+ Err(DBProstError::ProstEncodeError(err))
+ } else {
+ Ok(self.clone_with_input(body))
+ }
+ }
+
+ /// Turn a hyper request into a protobuf service request
+ pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> {
+ Box::pin(async move {
+ ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto())
+ })
+ }
+
+ /// Turn a protobuf service request into a hyper request
+ pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> {
+ self.to_proto_raw().map(|v| v.to_hyper_raw())
+ }
+}
\ No newline at end of file
diff --git a/xds/src/response.rs b/xds/src/response.rs
new file mode 100644
index 0000000..b38db8b
--- /dev/null
+++ b/xds/src/response.rs
@@ -0,0 +1,131 @@
+use hyper::{body, Body, Response, Version, header, StatusCode};
+use prost::{Message};
+use crate::util::*;
+use crate::error::*;
+
+
+/// A response with HTTP info and a serialized output object
+#[derive(Debug)]
+pub struct ServiceResponse<T> {
+ /// The HTTP version
+ pub version: Version,
+ /// The set of headers
+ ///
+ /// Should always at least have `Content-Type`. Servers will override `Content-Length` on serialization.
+ pub headers: header::HeaderMap,
+ /// The status code
+ pub status: StatusCode,
+ /// The serialized output object
+ pub output: T,
+}
+
+impl<T> ServiceResponse<T> {
+ /// Create new service request with the given input object
+ ///
+ /// This automatically sets the `Content-Type` header as `application/protobuf`.
+ pub fn new(output: T) -> ServiceResponse<T> {
+ let mut headers = header::HeaderMap::new();
+ headers.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
+
+ ServiceResponse {
+ version: Version::default(),
+ headers,
+ status: StatusCode::OK,
+ output,
+ }
+ }
+
+ /// Copy this response with a different output value
+ pub fn clone_with_output<U>(&self, output: U) -> ServiceResponse<U> {
+ ServiceResponse { version: self.version, headers: self.headers.clone(), status: self.status, output }
+ }
+}
+
+impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
+ fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
+}
+
+impl ServiceResponse<Vec<u8>> {
+ /// Turn a hyper response to a boxed future of a byte-array service response
+ pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> {
+ Box::pin(async move {
+ let version = resp.version();
+ let headers = resp.headers().clone();
+ let status = resp.status().clone();
+ let future_resp = body::to_bytes(resp.into_body()).await
+ .map_err(DBProstError::HyperError)
+ .map(move |body|
+ ServiceResponse { version, headers, status, output: body.to_vec() }
+ );
+ future_resp
+ })
+ }
+
+ /// Turn a byte-array service response into a hyper response
+ pub fn to_hyper_raw(&self) -> Response<Body> {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(self.output.clone())).unwrap();
+
+ let header_mut = resp.headers_mut();
+ header_mut.clone_from(&self.headers);
+ let lenth_value = header::HeaderValue::from(self.output.len() as u64);
+ header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+ resp
+ }
+
+ /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error
+ pub fn body_err(&self, err: DBProstError) -> DBProstError {
+ DBProstError::AfterBodyError {
+ body: self.output.clone(),
+ method: None,
+ version: self.version,
+ headers: self.headers.clone(),
+ status: Some(self.status),
+ err: Box::new(err),
+ }
+ }
+
+
+ /// Serialize the byte-array service response into a protobuf service response
+ pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
+ if self.status.is_success() {
+ match T::decode(self.output.as_ref()) {
+ Ok(v) => Ok(self.clone_with_output(v)),
+ Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
+ }
+ } else {
+ match DBError::from_json_bytes(self.status, &self.output) {
+ Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))),
+ Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err)))
+ }
+ }
+ }
+}
+
+impl<T: Message + Default + 'static> ServiceResponse<T> {
+ /// Turn a protobuf service response into a byte-array service response
+ pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+ let mut body = Vec::new();
+ if let Err(err) = self.output.encode(&mut body) {
+ Err(DBProstError::ProstEncodeError(err))
+ } else {
+ Ok(self.clone_with_output(body))
+ }
+ }
+
+
+ /// Turn a hyper response into a protobuf service response
+ pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
+ // Box::pin(async move {
+ ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto())
+ // })
+ }
+
+
+ /// Turn a protobuf service response into a hyper response
+ pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> {
+ self.to_proto_raw().map(|v| v.to_hyper_raw())
+ }
+}
+
diff --git a/xds/src/util.rs b/xds/src/util.rs
new file mode 100644
index 0000000..2fbe3f3
--- /dev/null
+++ b/xds/src/util.rs
@@ -0,0 +1,9 @@
+use futures::future::BoxFuture;
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::error::DBProstError;
+
+
+pub type BoxFutureReq<T> = BoxFuture<'static, Result<ServiceRequest<T>, DBProstError>>;
+pub type BoxFutureResp<O> = BoxFuture<'static, Result<ServiceResponse<O>, DBProstError>>;
+
diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs
new file mode 100644
index 0000000..b80ffa8
--- /dev/null
+++ b/xds/src/wrapper.rs
@@ -0,0 +1,110 @@
+use std::sync::Arc;
+use std::task::Poll;
+use futures::future::BoxFuture;
+
+use hyper::{Body, Request, Response, header, StatusCode};
+
+use hyper::client::conn::Builder;
+use hyper::client::connect::HttpConnector;
+use hyper::client::service::Connect;
+
+use tower::Service;
+use prost::{Message};
+
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::util::*;
+use crate::error::*;
+
+
+
+/// A wrapper for a hyper client
+#[derive(Debug)]
+pub struct HyperClient {
+ /// The hyper client
+ /// The root URL without any path attached
+ pub root_url: String,
+}
+
+impl HyperClient {
+ /// Create a new client wrapper for the given client and root using protobuf
+ pub fn new(root_url: &str) -> HyperClient {
+ HyperClient {
+ root_url: root_url.trim_end_matches('/').to_string(),
+ }
+ }
+
+ // Invoke the given request for the given path and return a boxed future result
+ pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError>
+ where I: Message + Default + 'static, O: Message + Default + 'static {
+ let hyper_req = req.to_hyper_proto().unwrap();
+ let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new());
+
+ let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/'));
+ let uri = url_string.parse::<hyper::Uri>().unwrap();
+
+ let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
+
+ let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
+ ServiceResponse::from_hyper_proto(hyper_resp).await
+ }
+}
+
+/// Service for taking a raw service request and returning a boxed future of a raw service response
+pub trait HyperService {
+ /// Accept a raw service request and return a boxed future of a raw service response
+ fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>;
+}
+
+/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service
+pub struct HyperServer<T> {
+ /// The `Arc` version of the service
+ ///
+ /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9)
+ pub service: Arc<T>,
+}
+
+impl<T> HyperServer<T> {
+ /// Create a new service wrapper for the given impl
+ pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } }
+}
+
+impl<T> Service<Request<Body>> for HyperServer<T>
+ where T: 'static + Send + Sync + HyperService
+{
+ type Response = Response<Body>;
+ type Error = hyper::Error;
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap();
+ if content_type == "application/protobuf" {
+ Box::pin(async move {
+ let status = StatusCode::UNSUPPORTED_MEDIA_TYPE;
+ Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp())
+ })
+ } else {
+ let service = self.service.clone();
+ Box::pin(async move {
+ let request = ServiceRequest::from_hyper_raw(req).await;
+ let resp = service.handle(request.unwrap()).await;
+ resp.map(|v| v.to_hyper_raw())
+ .or_else(|err| match err.root_err() {
+ DBProstError::ProstDecodeError(_) =>
+ Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body").
+ to_hyper_resp()),
+ DBProstError::DBWrapError(err) =>
+ Ok(err.to_hyper_resp()),
+ DBProstError::HyperError(err) =>
+ Err(err),
+ _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error").
+ to_hyper_resp()),
+ })
+ })
+ }
+ }
+}
\ No newline at end of file