You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/23 14:44:46 UTC

[dubbo-rust] 23/35: feat: gen rpc code form proto service defined

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-idl
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit a44357d0aad89f04bc9937c190d954a6e070d8a4
Author: johankoi <jo...@163.com>
AuthorDate: Thu Jul 14 04:25:27 2022 +0800

    feat: gen rpc code form proto service defined
    
    Signed-off-by: johankoi <jo...@163.com>
---
 Cargo.toml                                         |   6 +-
 dubbo-build/Cargo.toml                             |  12 ++
 dubbo-build/src/generator.rs                       | 136 +++++++++++++++++++++
 dubbo-build/src/lib.rs                             |   8 ++
 examples/grpc-gen/Cargo.toml                       |  20 +++
 examples/grpc-gen/build.rs                         |  12 ++
 .../pb_message => grpc-gen}/pb/greeter.proto       |   0
 examples/grpc-gen/src/greeter.rs                   |  73 +++++++++++
 examples/grpc-gen/src/main.rs                      |  12 ++
 examples/protobuf-transport/Cargo.toml             |  30 +++++
 examples/protobuf-transport/build.rs               |   9 ++
 .../pb/person.proto                                |   0
 examples/protobuf-transport/src/client.rs          |  33 +++++
 examples/protobuf-transport/src/lib.rs             |   7 ++
 .../src/pb => protobuf-transport/src}/person.rs    |   0
 .../main.rs => protobuf-transport/src/server.rs}   |   2 +-
 examples/protobuf/client/Cargo.toml                |  14 ---
 examples/protobuf/client/src/main.rs               |  32 -----
 examples/protobuf/pb_message/Cargo.toml            |  13 --
 examples/protobuf/pb_message/build.rs              |  18 ---
 examples/protobuf/pb_message/src/lib.rs            |  37 ------
 examples/protobuf/pb_message/src/pb/greeter.rs     |  10 --
 examples/protobuf/pb_message/src/pb/mod.rs         |   5 -
 examples/protobuf/server/Cargo.toml                |  10 --
 xds/Cargo.toml                                     |   3 +
 xds/src/error.rs                                   | 136 +++++++++++++++++++++
 xds/src/lib.rs                                     |  17 +++
 xds/src/request.rs                                 | 131 ++++++++++++++++++++
 xds/src/response.rs                                | 131 ++++++++++++++++++++
 xds/src/util.rs                                    |   9 ++
 xds/src/wrapper.rs                                 | 110 +++++++++++++++++
 31 files changed, 893 insertions(+), 143 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 0fe1fd2..3016158 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,12 +1,12 @@
 [workspace]
 members = [
   "xds",
+  "dubbo-build",
   "protocol",
   "registry",
   "metadata",
   "common",
   "config",
-  "examples/protobuf/client",
-  "examples/protobuf/server",
-  "examples/protobuf/pb_message",
+  "examples/grpc-gen",
+  "examples/protobuf-transport",
 ]
\ No newline at end of file
diff --git a/dubbo-build/Cargo.toml b/dubbo-build/Cargo.toml
new file mode 100644
index 0000000..5839555
--- /dev/null
+++ b/dubbo-build/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "dubbo-build"
+version = "0.1.0"
+edition = "2021"
+
+[features]
+generator-code = ["prost-build"]
+
+
+
+[dependencies]
+prost-build = { version = "0.10.4", optional = true }
\ No newline at end of file
diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs
new file mode 100644
index 0000000..05e25cf
--- /dev/null
+++ b/dubbo-build/src/generator.rs
@@ -0,0 +1,136 @@
+use prost_build::{Method, Service, ServiceGenerator};
+
+#[derive(Default)]
+pub struct CodeGenerator {}
+
+impl CodeGenerator {
+    pub fn new() -> CodeGenerator { Default::default() }
+
+    fn generate_module_name(&self) -> &str { "xds" }
+
+    fn generate_type_aliases(&mut self, buf: &mut String) {
+        buf.push_str(&format!(
+            "\n\
+                pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\
+                pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
+            self.generate_module_name()));
+    }
+
+    fn generate_main_trait(&self, service: &Service, buf: &mut String) {
+        buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n");
+
+        service.comments.append_with_indent(0, buf);
+        buf.push_str(&format!("pub trait {} {{", service.name));
+        for method in service.methods.iter() {
+            buf.push_str("\n");
+            method.comments.append_with_indent(1, buf);
+            buf.push_str(&format!("   {};\n", self.method_sig(method)));
+        }
+        buf.push_str("}\n");
+    }
+
+    fn method_sig(&self, method: &Method) -> String {
+        format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>",
+                method.name, method.input_type, method.output_type)
+    }
+
+    fn generate_client_struct(&self, service: &Service, buf: &mut String) {
+        buf.push_str(&format!(
+            "\n\
+            pub struct {0}Client {{\n    \
+                pub hyper_client: {1}::wrapper::HyperClient \n\
+            }}\n",service.name, self.generate_module_name()));
+    }
+
+    fn generate_client_impl(&self, service: &Service, buf: &mut String) {
+        buf.push_str(&format!(
+            "\n\
+            impl {0}Client {{\n     \
+               pub fn new(root_url: &str) -> {0}Client {{\n        \
+                   {0}Client {{\n            \
+                        hyper_client: {1}::wrapper::HyperClient::new(root_url) \n        \
+                   }}\n     \
+               }}\n\
+            }}\n", service.name, self.generate_module_name()));
+
+        buf.push_str("\n#[async_trait]");
+        buf.push_str(&format!("\nimpl {0} for {0}Client {{", service.name));
+
+        for method in service.methods.iter() {
+            buf.push_str(&format!(
+                "\n    {} {{\n        \
+                    self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n    \
+                }}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name));
+        }
+        buf.push_str("}\n");
+    }
+
+    fn generate_server_struct(&self, service: &Service, buf: &mut String) {
+        buf.push_str(&format!(
+            "\n\
+            pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n    \
+                pub hyper_server: {1}::wrapper::HyperServer<T> \n\
+            }}\n",service.name, self.generate_module_name()));
+    }
+
+    fn generate_server_impl(&self, service: &Service, buf: &mut String) {
+        buf.push_str(&format!(
+            "\n\
+             impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n    \
+                pub fn new(&self, service: T) -> {0}Server<T> {{\n        \
+                    {0}Server {{\n            \
+                        hyper_server: {1}::wrapper::HyperServer::new(service) \n        \
+                   }}\n     \
+                }}\n\
+            }}\n",
+            service.name, self.generate_module_name()));
+
+        buf.push_str(&format!(
+            "\n\
+            impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n    \
+                fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n       \
+                    use ::futures::Future;\n       \
+                    let trait_object_service = self.hyper_server.service.clone();\n       \
+                    match (req.method.clone(), req.uri.path()) {{",
+            service.name, self.generate_module_name()));
+
+        // Make match arms for each type
+        for method in service.methods.iter() {
+            buf.push_str(&format!(
+                "\n          \
+                (::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n              \
+                    Box::pin(async move {{ \n                  \
+                        let proto_req = req.to_proto().unwrap();  \n                  \
+                        let resp = trait_object_service.{3}(proto_req).await.unwrap();  \n                  \
+                        let proto_resp = resp.to_proto_raw();  \n                  \
+                        proto_resp  \n               \
+                    }}) \n           \
+                 }},", service.package, service.proto_name, method.proto_name, method.name));
+        }
+        // Final 404 arm and end fn
+        buf.push_str(&format!(
+            "\n          \
+                _ => {{\n            \
+                    Box::pin(async move {{ \n                \
+                        Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n            \
+                    }}) \n          \
+                }}\n       \
+            }}\n   \
+            }}\n\
+        }}", self.generate_module_name()));
+    }
+}
+
+impl ServiceGenerator for CodeGenerator {
+    fn generate(&mut self, service: Service, buf: &mut String) {
+        self.generate_type_aliases(buf);
+        self.generate_main_trait(&service, buf);
+        self.generate_client_struct(&service, buf);
+        self.generate_client_impl(&service, buf);
+        self.generate_server_struct(&service, buf);
+        self.generate_server_impl(&service, buf);
+    }
+
+    fn finalize(&mut self, buf: &mut String) {
+    }
+}
diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs
new file mode 100644
index 0000000..329eb39
--- /dev/null
+++ b/dubbo-build/src/lib.rs
@@ -0,0 +1,8 @@
+
+
+#[cfg(feature = "generator-code")]
+extern crate prost_build;
+#[cfg(feature = "generator-code")]
+mod generator;
+#[cfg(feature = "generator-code")]
+pub use generator::CodeGenerator;
diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml
new file mode 100644
index 0000000..335c81c
--- /dev/null
+++ b/examples/grpc-gen/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "grpc-gen"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+futures = "0.3.21"
+prost = "0.10.4"
+async-trait = { version = "0.1.56" }
+xds = { version = "0.1.0", path = "../../xds" }
+
+hyper = { version = "0.14.19", features = ["full"] }
+
+
+
+[build-dependencies]
+prost-build = "0.10.4"
+dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] }
\ No newline at end of file
diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs
new file mode 100644
index 0000000..0098c18
--- /dev/null
+++ b/examples/grpc-gen/build.rs
@@ -0,0 +1,12 @@
+use std::io::Result;
+use prost_build::Config;
+
+fn main() -> Result<()> {
+    let mut conf = Config::new();
+    let mut gen = dubbo_build::CodeGenerator::new();
+    conf.service_generator(Box::new(gen));
+    conf.out_dir("src/");
+    conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap();
+
+    Ok(())
+}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/pb/greeter.proto b/examples/grpc-gen/pb/greeter.proto
similarity index 100%
rename from examples/protobuf/pb_message/pb/greeter.proto
rename to examples/grpc-gen/pb/greeter.proto
diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs
new file mode 100644
index 0000000..3ecb8f2
--- /dev/null
+++ b/examples/grpc-gen/src/greeter.rs
@@ -0,0 +1,73 @@
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloRequest {
+    #[prost(string, tag="1")]
+    pub name: ::prost::alloc::string::String,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct HelloResponse {
+    #[prost(string, tag="1")]
+    pub message: ::prost::alloc::string::String,
+}
+
+pub type DBReq<I> = xds::request::ServiceRequest<I>;
+pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>;
+
+use async_trait::async_trait;
+
+#[async_trait]
+pub trait Greeter {
+   async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>;
+}
+
+pub struct GreeterClient {
+    pub hyper_client: xds::wrapper::HyperClient 
+}
+
+impl GreeterClient {
+     pub fn new(root_url: &str) -> GreeterClient {
+        GreeterClient {
+            hyper_client: xds::wrapper::HyperClient::new(root_url) 
+        }
+     }
+}
+
+#[async_trait]
+impl Greeter for GreeterClient {
+    async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> {
+        self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await
+    }
+}
+
+pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> {
+    pub hyper_server: xds::wrapper::HyperServer<T> 
+}
+
+impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> {
+    pub fn new(&self, service: T) -> GreeterServer<T> {
+        GreeterServer {
+            hyper_server: xds::wrapper::HyperServer::new(service) 
+        }
+     }
+}
+
+impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> {
+    fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> {
+       use ::futures::Future;
+       let trait_object_service = self.hyper_server.service.clone();
+       match (req.method.clone(), req.uri.path()) {
+          (::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => {
+              Box::pin(async move { 
+                  let proto_req = req.to_proto().unwrap();  
+                  let resp = trait_object_service.say_hello(proto_req).await.unwrap();  
+                  let proto_resp = resp.to_proto_raw();  
+                  proto_resp  
+               }) 
+           },
+          _ => {
+            Box::pin(async move { 
+                Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw()) 
+            }) 
+          }
+       }
+   }
+}
\ No newline at end of file
diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs
new file mode 100644
index 0000000..68b341a
--- /dev/null
+++ b/examples/grpc-gen/src/main.rs
@@ -0,0 +1,12 @@
+
+mod greeter;
+use greeter::*;
+
+
+fn main() {
+
+    let client =  GreeterClient::new("0.0.0.0:8080");
+    let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()});
+    client.say_hello(hello_req);
+
+}
diff --git a/examples/protobuf-transport/Cargo.toml b/examples/protobuf-transport/Cargo.toml
new file mode 100644
index 0000000..8a35f90
--- /dev/null
+++ b/examples/protobuf-transport/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "protobuf-transport"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+xds = { version = "0.1.0", path = "../../xds" }
+tokio = {version = "1.9.0", features = ["full"]}
+hyper = { version = "0.14", features = ["full"] }
+futures = "0.3.21"
+prost = "0.10.4"
+
+
+[lib]
+path = "./src/lib.rs"
+
+[[bin]]
+name="server"
+path="./src/server.rs"
+
+[[bin]]
+name="client"
+path="./src/client.rs"
+
+
+[build-dependencies]
+prost-build = "0.10.4"
+dubbo-build = { path = "../../dubbo-build", features = ["generator-code"] }
\ No newline at end of file
diff --git a/examples/protobuf-transport/build.rs b/examples/protobuf-transport/build.rs
new file mode 100644
index 0000000..6459de7
--- /dev/null
+++ b/examples/protobuf-transport/build.rs
@@ -0,0 +1,9 @@
+use std::io::Result;
+use prost_build::Config;
+
+fn main() -> Result<()> {
+    Config::new()
+        .out_dir("src/")
+        .compile_protos(&["pb/person.proto"], &["pb/"])?;
+    Ok(())
+}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/pb/person.proto b/examples/protobuf-transport/pb/person.proto
similarity index 100%
rename from examples/protobuf/pb_message/pb/person.proto
rename to examples/protobuf-transport/pb/person.proto
diff --git a/examples/protobuf-transport/src/client.rs b/examples/protobuf-transport/src/client.rs
new file mode 100644
index 0000000..4d598bb
--- /dev/null
+++ b/examples/protobuf-transport/src/client.rs
@@ -0,0 +1,33 @@
+use std::collections::hash_map::HashMap;
+use xds::{client::RpcClient};
+use prost::Message;
+use hyper::body::Buf;
+use hyper::{Request, Response};
+
+pub mod person;
+use person::Person;
+
+
+#[tokio::main]
+async fn main() {
+
+    let mut client = RpcClient::new(String::from("http://127.0.0.1:8972"));
+
+    let service_path = String::from("helloworld");
+    let service_method = String::from("hello");
+    let metadata = HashMap::new();
+
+    let mut person = Person::default();
+    person.name = "guomiwu".to_string();
+    let pbData = person.encode_to_vec();
+
+    let callResult = client.call(service_path, service_method, &metadata, pbData).await;
+    let resp = callResult.unwrap();
+
+    //  asynchronously aggregate the chunks of the body
+    let body = hyper::body::aggregate(resp).await;
+    let data = body.unwrap().chunk().to_vec();
+
+    let resPerson = Person::decode(data.as_ref()).unwrap();
+    println!("resPerson={:?}", resPerson.name);
+}
\ No newline at end of file
diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs
new file mode 100644
index 0000000..959e687
--- /dev/null
+++ b/examples/protobuf-transport/src/lib.rs
@@ -0,0 +1,7 @@
+
+pub mod person;
+pub use person::*;
+
+fn main() {
+    println!("Hello, world!");
+}
diff --git a/examples/protobuf/pb_message/src/pb/person.rs b/examples/protobuf-transport/src/person.rs
similarity index 100%
rename from examples/protobuf/pb_message/src/pb/person.rs
rename to examples/protobuf-transport/src/person.rs
diff --git a/examples/protobuf/server/src/main.rs b/examples/protobuf-transport/src/server.rs
similarity index 82%
rename from examples/protobuf/server/src/main.rs
rename to examples/protobuf-transport/src/server.rs
index a182144..9b0c443 100644
--- a/examples/protobuf/server/src/main.rs
+++ b/examples/protobuf-transport/src/server.rs
@@ -5,7 +5,7 @@ use xds::{ server::RpcServer };
 #[tokio::main]
 async fn main() {
     let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
-    let mut server = RpcServer::new(addr);
+    let server = RpcServer::new(addr);
     server.start().await;
     println!("RpcServer ok");
 }
\ No newline at end of file
diff --git a/examples/protobuf/client/Cargo.toml b/examples/protobuf/client/Cargo.toml
deleted file mode 100644
index e4c426c..0000000
--- a/examples/protobuf/client/Cargo.toml
+++ /dev/null
@@ -1,14 +0,0 @@
-[package]
-name = "protobuf"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-xds = { version = "0.1.0", path = "../../../xds" }
-tokio = {version = "1.9.0", features = ["full"]}
-pb_message =  { version = "0.1.0", path = "../pb_message" }
-prost = "0.10.4"
-hyper = { version = "0.14", features = ["full"] }
-
diff --git a/examples/protobuf/client/src/main.rs b/examples/protobuf/client/src/main.rs
deleted file mode 100644
index bd6507f..0000000
--- a/examples/protobuf/client/src/main.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-use std::collections::hash_map::HashMap;
-use xds::{ client::RpcClient };
-use pb_message::pb::person::Person;
-use prost::Message;
-use hyper::body::Buf;
-
-#[tokio::main]
-async fn main() {
-    let task = tokio::spawn(async move {
-        let mut client = RpcClient::new(String::from("http://127.0.0.1:8972"));
-
-        let service_path = String::from("helloworld");
-        let service_method = String::from("hello");
-        let metadata = HashMap::new();
-
-        let mut person = Person::default();
-        person.name = "guomiwu".to_string();
-        let pbData = person.encode_to_vec();
-
-        let callResult = client.call(service_path, service_method, &metadata, pbData).await;
-        let resp = callResult.unwrap();
-
-        //  asynchronously aggregate the chunks of the body
-        let body = hyper::body::aggregate(resp).await;
-        let data = body.unwrap().chunk().to_vec();
-
-        let resPerson = Person::decode(data.as_ref()).unwrap();
-        println!("resPerson={:?}", resPerson.name);
-
-    });
-    task.await.unwrap();
-}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/Cargo.toml b/examples/protobuf/pb_message/Cargo.toml
deleted file mode 100644
index f4a97cd..0000000
--- a/examples/protobuf/pb_message/Cargo.toml
+++ /dev/null
@@ -1,13 +0,0 @@
-[package]
-name = "pb_message"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-prost = "0.10.4"
-prost-types = "0.10.1"
-
-[build-dependencies]
-prost-build = "0.10.4"
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/build.rs b/examples/protobuf/pb_message/build.rs
deleted file mode 100644
index a198077..0000000
--- a/examples/protobuf/pb_message/build.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-use std::io::Result;
-use prost_build::Config;
-
-fn main() -> Result<()> {
-
-    Config::new()
-        .out_dir("src/pb")
-        .compile_protos(&["pb/person.proto"], &["pb/"])?;
-
-    // println!("cargo:rerun-if-changed=pb/greeter_rpc.pb");
-    // println!("cargo:rerun-if-changed=pb/build.rs");
-
-    Config::new()
-        .out_dir("src/pb")
-        .compile_protos(&["pb/greeter.proto"], &["pb/"])?;
-
-    Ok(())
-}
\ No newline at end of file
diff --git a/examples/protobuf/pb_message/src/lib.rs b/examples/protobuf/pb_message/src/lib.rs
deleted file mode 100644
index 3a90aed..0000000
--- a/examples/protobuf/pb_message/src/lib.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-
-pub mod pb;
-
-#[cfg(test)]
-mod tests {
-    use crate::pb::Person;
-    use crate::pb::greeter;
-
-    use std::io::Cursor;
-    use prost::Message;
-
-    pub fn create_hello_request(name: String) -> greeter::HelloRequest {
-        let mut hello_request = greeter::HelloRequest::default();
-        hello_request.name = name;
-        hello_request
-    }
-
-    pub fn serialize_greeter(hello: &greeter::HelloRequest) -> Vec<u8> {
-        let mut buf = Vec::new();
-        buf.reserve(hello.encoded_len());
-
-        hello.encode(&mut buf).unwrap();
-        buf
-    }
-
-
-    // pub fn deserialize_greeter(buf: &[u8]) -> Result<greeter::HelloRequest, prost::DecodeError> {
-    //     greeter::HelloRequest::decode(&mut Cursor::new(buf))
-    // }
-
-    #[test]
-    fn it_works() {
-        let person = Person::default();
-        person.encode_to_vec();
-        println!("{person:?}");
-    }
-}
diff --git a/examples/protobuf/pb_message/src/pb/greeter.rs b/examples/protobuf/pb_message/src/pb/greeter.rs
deleted file mode 100644
index 36aa34d..0000000
--- a/examples/protobuf/pb_message/src/pb/greeter.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct HelloRequest {
-    #[prost(string, tag="1")]
-    pub name: ::prost::alloc::string::String,
-}
-#[derive(Clone, PartialEq, ::prost::Message)]
-pub struct HelloResponse {
-    #[prost(string, tag="1")]
-    pub message: ::prost::alloc::string::String,
-}
diff --git a/examples/protobuf/pb_message/src/pb/mod.rs b/examples/protobuf/pb_message/src/pb/mod.rs
deleted file mode 100644
index 8c11e43..0000000
--- a/examples/protobuf/pb_message/src/pb/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-pub mod person;
-pub mod greeter;
-
-pub use person::*;
-pub use greeter::*;
\ No newline at end of file
diff --git a/examples/protobuf/server/Cargo.toml b/examples/protobuf/server/Cargo.toml
deleted file mode 100644
index fe077e4..0000000
--- a/examples/protobuf/server/Cargo.toml
+++ /dev/null
@@ -1,10 +0,0 @@
-[package]
-name = "server"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-[dependencies]
-xds = { version = "0.1.0", path = "../../../xds" }
-tokio = {version = "1.9.0", features = ["full"]}
\ No newline at end of file
diff --git a/xds/Cargo.toml b/xds/Cargo.toml
index 49af168..f224f7c 100644
--- a/xds/Cargo.toml
+++ b/xds/Cargo.toml
@@ -25,3 +25,6 @@ flate2 = "1.0"
 tower = "0.4"
 log = "0.4"
 env_logger = "0.9.0"
+
+
+prost = "0.10.4"
\ No newline at end of file
diff --git a/xds/src/error.rs b/xds/src/error.rs
new file mode 100644
index 0000000..e212835
--- /dev/null
+++ b/xds/src/error.rs
@@ -0,0 +1,136 @@
+use serde_json;
+use prost::{DecodeError, EncodeError};
+use hyper::{Body, Response, Method, Version, header, StatusCode};
+use crate::response::ServiceResponse;
+
+
+/// A JSON-serializable DBError error
+#[derive(Debug)]
+pub struct DBError {
+    pub status: StatusCode,
+    pub error_type: String,
+    pub msg: String,
+    pub meta: Option<serde_json::Value>,
+}
+
+impl DBError {
+    /// Create a DBError error with no meta
+    pub fn new(status: StatusCode, error_type: &str, msg: &str) -> DBError {
+        DBError::new_meta(status, error_type, msg, None)
+    }
+
+    /// Create a DBError error with optional meta
+    pub fn new_meta(status: StatusCode, error_type: &str, msg: &str, meta: Option<serde_json::Value>) -> DBError {
+        DBError { status, error_type: error_type.to_string(), msg: msg.to_string(), meta }
+    }
+
+    /// Create a byte-array service response for this error and the given status code
+    pub fn to_resp_raw(&self) -> ServiceResponse<Vec<u8>> {
+        let output = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec());
+        let mut headers = header::HeaderMap::new();
+        headers.insert(header::CONTENT_TYPE, "json".parse().unwrap());
+        let lenth_value = header::HeaderValue::from(output.len() as u64);
+        headers.insert(header::CONTENT_LENGTH, lenth_value);
+
+        ServiceResponse {
+            version: Version::default(),
+            headers,
+            status: self.status,
+            output,
+        }
+    }
+
+    /// Create a hyper response for this error and the given status code
+    pub fn to_hyper_resp(&self) -> Response<Body> {
+        let body = self.to_json_bytes().unwrap_or_else(|_| "{}".as_bytes().to_vec());
+
+        let mut resp = Response::builder()
+            .status(self.status)
+            .body(Body::from(body.clone())).unwrap();
+
+        let header_mut = resp.headers_mut();
+        header_mut.insert(header::CONTENT_TYPE, "json".parse().unwrap());
+        let lenth_value = header::HeaderValue::from(body.len() as u64);
+        header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+        resp
+    }
+
+    /// Create error from Serde JSON value
+    pub fn from_json(status: StatusCode, json: serde_json::Value) -> DBError {
+        let error_type = json["error_type"].as_str();
+        DBError {
+            status,
+            error_type: error_type.unwrap_or("<no code>").to_string(),
+            msg: json["msg"].as_str().unwrap_or("<no message>").to_string(),
+            // Put the whole thing as meta if there was no type
+            meta: if error_type.is_some() { json.get("meta").map(|v| v.clone()) } else { Some(json.clone()) },
+        }
+    }
+
+    /// Create error from byte array
+    pub fn from_json_bytes(status: StatusCode, json: &[u8]) -> serde_json::Result<DBError> {
+        serde_json::from_slice(json).map(|v| DBError::from_json(status, v))
+    }
+
+    /// Create Serde JSON value from error
+    pub fn to_json(&self) -> serde_json::Value {
+        let mut props = serde_json::map::Map::new();
+        props.insert("error_type".to_string(), serde_json::Value::String(self.error_type.clone()));
+        props.insert("msg".to_string(), serde_json::Value::String(self.msg.clone()));
+        if let Some(ref meta) = self.meta { props.insert("meta".to_string(), meta.clone()); }
+        serde_json::Value::Object(props)
+    }
+
+    /// Create byte array from error
+    pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
+        serde_json::to_vec(&self.to_json())
+    }
+}
+
+impl From<DBError> for DBProstError {
+    fn from(v: DBError) -> DBProstError { DBProstError::DBWrapError(v) }
+}
+
+
+
+
+
+/// An error that can occur during a call to a  service
+#[derive(Debug)]
+pub enum DBProstError {
+    /// A standard DBError error with a type, message, and some metadata
+    DBWrapError(DBError),
+    /// An error when trying to decode JSON into an error or object
+    JsonDecodeError(serde_json::Error),
+    /// An error when trying to encode a protobuf object
+    ProstEncodeError(EncodeError),
+    /// An error when trying to decode a protobuf object
+    ProstDecodeError(DecodeError),
+    /// A generic hyper error
+    HyperError(hyper::Error),
+    /// A wrapper for any of the other `DBProstError`s that also includes request/response info
+    AfterBodyError {
+        /// The request or response's raw body before the error happened
+        body: Vec<u8>,
+        /// The request method, only present for server errors
+        method: Option<Method>,
+        /// The request or response's HTTP version
+        version: Version,
+        /// The request or response's headers
+        headers: header::HeaderMap,
+        /// The response status, only present for client errors
+        status: Option<StatusCode>,
+        /// The underlying error
+        err: Box<DBProstError>,
+    },
+}
+
+impl DBProstError {
+    /// This same error, or the underlying error if it is an `AfterBodyError`
+    pub fn root_err(self) -> DBProstError {
+        match self {
+            DBProstError::AfterBodyError { err, .. } => err.root_err(),
+            _ => self
+        }
+    }
+}
\ No newline at end of file
diff --git a/xds/src/lib.rs b/xds/src/lib.rs
index 4f818f9..d3a7e94 100644
--- a/xds/src/lib.rs
+++ b/xds/src/lib.rs
@@ -19,6 +19,23 @@ pub mod client;
 pub mod server;
 pub mod protocol;
 
+
+pub mod request;
+pub mod response;
+pub mod wrapper;
+pub mod error;
+pub mod util;
+
+
+
+pub use wrapper::*;
+pub use request::*;
+pub use response::*;
+pub use error::*;
+pub use util::*;
+
+
+
 #[cfg(test)]
 mod tests {
     use crate::client::client::RpcClient;
diff --git a/xds/src/request.rs b/xds/src/request.rs
new file mode 100644
index 0000000..4d497c0
--- /dev/null
+++ b/xds/src/request.rs
@@ -0,0 +1,131 @@
+use hyper::{body, Body, Request, Uri, Method, Version, header};
+use prost::{Message};
+use crate::util::*;
+use crate::error::*;
+
+/// A request with HTTP info and the serialized input object
+#[derive(Debug)]
+pub struct ServiceRequest<T> {
+    /// The URI of the original request
+    ///
+    /// When using a client, this will be overridden with the proper URI. It is only valuable for servers.
+    pub uri: Uri,
+    /// The request method; should always be Post
+    pub method: Method,
+    /// The HTTP version, rarely changed from the default
+    pub version: Version,
+    /// The set of headers
+    ///
+    /// Should always at least have `Content-Type`. Clients will override `Content-Length` on serialization.
+    pub headers: header::HeaderMap,
+    // The serialized request object
+    pub input: T,
+}
+
+impl<T> ServiceRequest<T> {
+    /// Create new service request with the given input object
+    ///
+    /// This automatically sets the `Content-Type` header as `application/protobuf`.
+    pub fn new(input: T) -> ServiceRequest<T> {
+        let mut header = header::HeaderMap::new();
+        header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
+        ServiceRequest {
+            uri: Default::default(),
+            method: Method::POST,
+            version: Version::default(),
+            headers: header,
+            input,
+        }
+    }
+
+    /// Copy this request with a different input value
+    pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> {
+        ServiceRequest {
+            uri: self.uri.clone(),
+            method: self.method.clone(),
+            version: self.version,
+            headers: self.headers.clone(),
+            input,
+        }
+    }
+}
+
+impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> {
+    fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) }
+}
+
+impl ServiceRequest<Vec<u8>> {
+    /// Turn a hyper request to a boxed future of a byte-array service request
+    pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> {
+        Box::pin(async move {
+            let uri = req.uri().clone();
+            let method = req.method().clone();
+            let version = req.version();
+            let headers = req.headers().clone();
+            let future_req = body::to_bytes(req.into_body()).await
+                .map_err(DBProstError::HyperError)
+                .map(move |body|
+                    ServiceRequest { uri, method, version, headers, input: body.to_vec() }
+                );
+            future_req
+        })
+    }
+
+    /// Turn a byte-array service request into a hyper request
+    pub fn to_hyper_raw(&self) -> Request<Body> {
+        let mut request = Request::builder()
+            .method(self.method.clone())
+            .uri(self.uri.clone())
+            .body(Body::from(self.input.clone())).unwrap();
+
+        let header_mut = request.headers_mut();
+        header_mut.clone_from(&self.headers);
+        let lenth_value = header::HeaderValue::from(self.input.len() as u64);
+        header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+        request
+    }
+
+    /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error
+    pub fn body_err(&self, err: DBProstError) -> DBProstError {
+        DBProstError::AfterBodyError {
+            body: self.input.clone(),
+            method: Some(self.method.clone()),
+            version: self.version,
+            headers: self.headers.clone(),
+            status: None,
+            err: Box::new(err),
+        }
+    }
+
+    /// Serialize the byte-array service request into a protobuf service request
+    pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
+        match T::decode(self.input.as_ref()) {
+            Ok(v) => Ok(self.clone_with_input(v)),
+            Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
+        }
+    }
+}
+
+impl<T: Message + Default + 'static> ServiceRequest<T> {
+    /// Turn a protobuf service request into a byte-array service request
+    pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+        let mut body = Vec::new();
+        if let Err(err) = self.input.encode(&mut body) {
+            Err(DBProstError::ProstEncodeError(err))
+        } else {
+            Ok(self.clone_with_input(body))
+        }
+    }
+
+    /// Turn a hyper request into a protobuf service request
+    pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> {
+        Box::pin(async move {
+            ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto())
+        })
+    }
+
+    /// Turn a protobuf service request into a hyper request
+    pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> {
+        self.to_proto_raw().map(|v| v.to_hyper_raw())
+    }
+}
\ No newline at end of file
diff --git a/xds/src/response.rs b/xds/src/response.rs
new file mode 100644
index 0000000..b38db8b
--- /dev/null
+++ b/xds/src/response.rs
@@ -0,0 +1,131 @@
+use hyper::{body, Body, Response, Version, header, StatusCode};
+use prost::{Message};
+use crate::util::*;
+use crate::error::*;
+
+
+/// A response with HTTP info and a serialized output object
+#[derive(Debug)]
+pub struct ServiceResponse<T> {
+    /// The HTTP version
+    pub version: Version,
+    /// The set of headers
+    ///
+    /// Should always at least have `Content-Type`. Servers will override `Content-Length` on serialization.
+    pub headers: header::HeaderMap,
+    /// The status code
+    pub status: StatusCode,
+    /// The serialized output object
+    pub output: T,
+}
+
+impl<T> ServiceResponse<T> {
+    /// Create new service request with the given input object
+    ///
+    /// This automatically sets the `Content-Type` header as `application/protobuf`.
+    pub fn new(output: T) -> ServiceResponse<T> {
+        let mut headers = header::HeaderMap::new();
+        headers.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
+
+        ServiceResponse {
+            version: Version::default(),
+            headers,
+            status: StatusCode::OK,
+            output,
+        }
+    }
+
+    /// Copy this response with a different output value
+    pub fn clone_with_output<U>(&self, output: U) -> ServiceResponse<U> {
+        ServiceResponse { version: self.version, headers: self.headers.clone(), status: self.status, output }
+    }
+}
+
+impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
+    fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
+}
+
+impl ServiceResponse<Vec<u8>> {
+    /// Turn a hyper response to a boxed future of a byte-array service response
+    pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> {
+        Box::pin(async move {
+            let version = resp.version();
+            let headers = resp.headers().clone();
+            let status = resp.status().clone();
+            let future_resp = body::to_bytes(resp.into_body()).await
+                .map_err(DBProstError::HyperError)
+                .map(move |body|
+                    ServiceResponse { version, headers, status, output: body.to_vec() }
+                );
+            future_resp
+        })
+    }
+
+    /// Turn a byte-array service response into a hyper response
+    pub fn to_hyper_raw(&self) -> Response<Body> {
+        let mut resp = Response::builder()
+            .status(StatusCode::OK)
+            .body(Body::from(self.output.clone())).unwrap();
+
+        let header_mut = resp.headers_mut();
+        header_mut.clone_from(&self.headers);
+        let lenth_value = header::HeaderValue::from(self.output.len() as u64);
+        header_mut.insert(header::CONTENT_LENGTH, lenth_value);
+        resp
+    }
+
+    /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error
+    pub fn body_err(&self, err: DBProstError) -> DBProstError {
+        DBProstError::AfterBodyError {
+            body: self.output.clone(),
+            method: None,
+            version: self.version,
+            headers: self.headers.clone(),
+            status: Some(self.status),
+            err: Box::new(err),
+        }
+    }
+
+
+    /// Serialize the byte-array service response into a protobuf service response
+    pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
+        if self.status.is_success() {
+            match T::decode(self.output.as_ref()) {
+                Ok(v) => Ok(self.clone_with_output(v)),
+                Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
+            }
+        } else {
+            match DBError::from_json_bytes(self.status, &self.output) {
+                Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))),
+                Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err)))
+            }
+        }
+    }
+}
+
+impl<T: Message + Default + 'static> ServiceResponse<T> {
+    /// Turn a protobuf service response into a byte-array service response
+    pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+        let mut body = Vec::new();
+        if let Err(err) = self.output.encode(&mut body) {
+            Err(DBProstError::ProstEncodeError(err))
+        } else {
+            Ok(self.clone_with_output(body))
+        }
+    }
+
+
+    /// Turn a hyper response into a protobuf service response
+    pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
+        // Box::pin(async move {
+        ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto())
+        // })
+    }
+
+
+    /// Turn a protobuf service response into a hyper response
+    pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> {
+        self.to_proto_raw().map(|v| v.to_hyper_raw())
+    }
+}
+
diff --git a/xds/src/util.rs b/xds/src/util.rs
new file mode 100644
index 0000000..2fbe3f3
--- /dev/null
+++ b/xds/src/util.rs
@@ -0,0 +1,9 @@
+use futures::future::BoxFuture;
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::error::DBProstError;
+
+
+pub type BoxFutureReq<T> = BoxFuture<'static, Result<ServiceRequest<T>, DBProstError>>;
+pub type BoxFutureResp<O> = BoxFuture<'static, Result<ServiceResponse<O>, DBProstError>>;
+
diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs
new file mode 100644
index 0000000..b80ffa8
--- /dev/null
+++ b/xds/src/wrapper.rs
@@ -0,0 +1,110 @@
+use std::sync::Arc;
+use std::task::Poll;
+use futures::future::BoxFuture;
+
+use hyper::{Body, Request, Response, header, StatusCode};
+
+use hyper::client::conn::Builder;
+use hyper::client::connect::HttpConnector;
+use hyper::client::service::Connect;
+
+use tower::Service;
+use prost::{Message};
+
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::util::*;
+use crate::error::*;
+
+
+
+/// A wrapper for a hyper client
+#[derive(Debug)]
+pub struct HyperClient {
+    /// The hyper client
+    /// The root URL without any path attached
+    pub root_url: String,
+}
+
+impl HyperClient {
+    /// Create a new client wrapper for the given client and root using protobuf
+    pub fn new(root_url: &str) -> HyperClient {
+        HyperClient {
+            root_url: root_url.trim_end_matches('/').to_string(),
+        }
+    }
+
+    // Invoke the given request for the given path and return a boxed future result
+    pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError>
+        where I: Message + Default + 'static, O: Message + Default + 'static {
+        let hyper_req = req.to_hyper_proto().unwrap();
+        let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new());
+
+        let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/'));
+        let uri = url_string.parse::<hyper::Uri>().unwrap();
+
+        let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
+
+        let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
+        ServiceResponse::from_hyper_proto(hyper_resp).await
+    }
+}
+
+/// Service for taking a raw service request and returning a boxed future of a raw service response
+pub trait HyperService {
+    /// Accept a raw service request and return a boxed future of a raw service response
+    fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>;
+}
+
+/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service
+pub struct HyperServer<T> {
+    /// The `Arc` version of the service
+    ///
+    /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9)
+    pub service: Arc<T>,
+}
+
+impl<T> HyperServer<T> {
+    /// Create a new service wrapper for the given impl
+    pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } }
+}
+
+impl<T> Service<Request<Body>> for HyperServer<T>
+    where T: 'static + Send + Sync + HyperService
+{
+    type Response = Response<Body>;
+    type Error = hyper::Error;
+    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+        let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap();
+        if content_type == "application/protobuf" {
+            Box::pin(async move {
+                let status = StatusCode::UNSUPPORTED_MEDIA_TYPE;
+                Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp())
+            })
+        } else {
+            let service = self.service.clone();
+            Box::pin(async move {
+                let request = ServiceRequest::from_hyper_raw(req).await;
+                let resp = service.handle(request.unwrap()).await;
+                resp.map(|v| v.to_hyper_raw())
+                    .or_else(|err| match err.root_err() {
+                        DBProstError::ProstDecodeError(_) =>
+                            Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body").
+                                to_hyper_resp()),
+                        DBProstError::DBWrapError(err) =>
+                            Ok(err.to_hyper_resp()),
+                        DBProstError::HyperError(err) =>
+                            Err(err),
+                        _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error").
+                            to_hyper_resp()),
+                    })
+            })
+        }
+    }
+}
\ No newline at end of file