You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/23 14:44:52 UTC

[dubbo-rust] 29/35: finished 1.0.0

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch poc-idl
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git

commit 60bfedf3134569b97dd7648cc747ebb05476250c
Author: hxq <jo...@163.com>
AuthorDate: Thu Jul 21 16:25:11 2022 +0800

    finished 1.0.0
---
 .github/.DS_Store                      | Bin 0 -> 6148 bytes
 dubbo-build/.DS_Store                  | Bin 0 -> 6148 bytes
 dubbo-build/src/generator.rs           | 106 +++++++++++++++++-------
 dubbo-build/src/lib.rs                 |  17 +++-
 examples/.DS_Store                     | Bin 0 -> 6148 bytes
 examples/grpc-gen/.DS_Store            | Bin 0 -> 6148 bytes
 examples/grpc-gen/Cargo.toml           |  14 +++-
 examples/grpc-gen/build.rs             |   3 +-
 examples/grpc-gen/src/.DS_Store        | Bin 0 -> 6148 bytes
 examples/grpc-gen/src/client.rs        |  10 +++
 examples/grpc-gen/src/greeter.rs       |  68 +++++++++++-----
 examples/grpc-gen/src/lib.rs           |   6 ++
 examples/grpc-gen/src/main.rs          |  12 ---
 examples/grpc-gen/src/server.rs        |  38 +++++++++
 examples/protobuf-transport/src/lib.rs |   2 +-
 xds/.DS_Store                          | Bin 0 -> 6148 bytes
 xds/src/.DS_Store                      | Bin 0 -> 6148 bytes
 xds/src/client/client.rs               |  34 +++++++-
 xds/src/error.rs                       |  17 ++++
 xds/src/lib.rs                         |   3 -
 xds/src/protocol/error.rs              |  17 ++++
 xds/src/protocol/message.rs            |  17 ++++
 xds/src/protocol/mod.rs                |  17 ++++
 xds/src/request.rs                     | 143 ++++++++++++++++-----------------
 xds/src/response.rs                    | 116 +++++++++++++-------------
 xds/src/server/.DS_Store               | Bin 0 -> 6148 bytes
 xds/src/server/server.rs               |  17 ++--
 xds/src/util.rs                        |  17 ++++
 xds/src/wrapper.rs                     | 110 -------------------------
 29 files changed, 467 insertions(+), 317 deletions(-)

diff --git a/.github/.DS_Store b/.github/.DS_Store
new file mode 100644
index 0000000..3bb81dc
Binary files /dev/null and b/.github/.DS_Store differ
diff --git a/dubbo-build/.DS_Store b/dubbo-build/.DS_Store
new file mode 100644
index 0000000..cd16f51
Binary files /dev/null and b/dubbo-build/.DS_Store differ
diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs
index 05e25cf..79fa172 100644
--- a/dubbo-build/src/generator.rs
+++ b/dubbo-build/src/generator.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use prost_build::{Method, Service, ServiceGenerator};
 
 #[derive(Default)]
@@ -8,17 +25,32 @@ impl CodeGenerator {
 
     fn generate_module_name(&self) -> &str { "xds" }
 
-    fn generate_type_aliases(&mut self, buf: &mut String) {
+    fn generate_uses(&mut self, buf: &mut String) {
         buf.push_str(&format!(
             "\n\
-                pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\
-                pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
+                use async_trait::async_trait;\n\
+                use tower::Service;\n\
+                use hyper::{{\n    \
+                    Body,\n    \
+                    Request,\n    \
+                    Response,\n\
+                }};\n\
+                use futures::future::{{\n    \
+                    BoxFuture\n\
+                }};\n\
+                use std::{{\n    \
+                    task::{{Poll}},\n\
+                }};\n"));
+    }
+
+    fn generate_type_aliases(&mut self, buf: &mut String) {
+        buf.push_str(&format!(
+            "\npub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
             self.generate_module_name()));
     }
 
     fn generate_main_trait(&self, service: &Service, buf: &mut String) {
-        buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n");
-
+        buf.push_str("\n\n#[async_trait]\n");
         service.comments.append_with_indent(0, buf);
         buf.push_str(&format!("pub trait {} {{", service.name));
         for method in service.methods.iter() {
@@ -30,7 +62,7 @@ impl CodeGenerator {
     }
 
     fn method_sig(&self, method: &Method) -> String {
-        format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>",
+        format!("async fn {0}(&self, request: {1}) -> DBResp<{2}>",
                 method.name, method.input_type, method.output_type)
     }
 
@@ -38,7 +70,7 @@ impl CodeGenerator {
         buf.push_str(&format!(
             "\n\
             pub struct {0}Client {{\n    \
-                pub hyper_client: {1}::wrapper::HyperClient \n\
+                pub rpc_client: {1}::client::RpcClient \n\
             }}\n",service.name, self.generate_module_name()));
     }
 
@@ -46,9 +78,9 @@ impl CodeGenerator {
         buf.push_str(&format!(
             "\n\
             impl {0}Client {{\n     \
-               pub fn new(root_url: &str) -> {0}Client {{\n        \
+               pub fn new(addr: String) -> {0}Client {{\n        \
                    {0}Client {{\n            \
-                        hyper_client: {1}::wrapper::HyperClient::new(root_url) \n        \
+                        rpc_client: {1}::client::RpcClient::new(addr) \n        \
                    }}\n     \
                }}\n\
             }}\n", service.name, self.generate_module_name()));
@@ -58,8 +90,9 @@ impl CodeGenerator {
 
         for method in service.methods.iter() {
             buf.push_str(&format!(
-                "\n    {} {{\n        \
-                    self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n    \
+                "\n    {0} {{\n        \
+                    let path = \"dubbo/{1}.{2}/{3}\".to_owned();\n        \
+                    self.rpc_client.request(request, path).await\n    \
                 }}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name));
         }
         buf.push_str("}\n");
@@ -69,50 +102,64 @@ impl CodeGenerator {
         buf.push_str(&format!(
             "\n\
             pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n    \
-                pub hyper_server: {1}::wrapper::HyperServer<T> \n\
-            }}\n",service.name, self.generate_module_name()));
+                pub inner: T\n\
+            }}\n",service.name));
     }
 
     fn generate_server_impl(&self, service: &Service, buf: &mut String) {
         buf.push_str(&format!(
             "\n\
              impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n    \
-                pub fn new(&self, service: T) -> {0}Server<T> {{\n        \
+                pub fn new(service: T) -> {0}Server<T> {{\n        \
                     {0}Server {{\n            \
-                        hyper_server: {1}::wrapper::HyperServer::new(service) \n        \
+                        inner: service \n        \
                    }}\n     \
                 }}\n\
             }}\n",
-            service.name, self.generate_module_name()));
+            service.name));
 
         buf.push_str(&format!(
             "\n\
-            impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n    \
-                fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n       \
-                    use ::futures::Future;\n       \
-                    let trait_object_service = self.hyper_server.service.clone();\n       \
-                    match (req.method.clone(), req.uri.path()) {{",
-            service.name, self.generate_module_name()));
+            impl<T> Service<Request<Body>> for {0}Server<T> \n    \
+                where T: 'static + Greeter + Send + Sync + Clone {{\n    \
+                type Response = Response<Body>;\n    \
+                type Error = hyper::Error;\n    \
+                type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;\n    \
+                \n    \
+
+                fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {{\n        \
+                    Poll::Ready(Ok(()))\n    \
+                }}\n    \
+
+                \n    \
+
+                fn call(&mut self, req: Request<Body>) -> Self::Future {{\n       \
+                    let inner = self.inner.clone();\n       \
+                    let url = req.uri().path();\n       \
+                    match (req.method().clone(), url) {{",
+            service.name));
 
         // Make match arms for each type
         for method in service.methods.iter() {
             buf.push_str(&format!(
                 "\n          \
                 (::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n              \
-                    Box::pin(async move {{ \n                  \
-                        let proto_req = req.to_proto().unwrap();  \n                  \
-                        let resp = trait_object_service.{3}(proto_req).await.unwrap();  \n                  \
-                        let proto_resp = resp.to_proto_raw();  \n                  \
-                        proto_resp  \n               \
+                    Box::pin(async move {{\n                  \
+                        let request = {3}::ServiceRequest::try_from_hyper(req).await;\n                  \
+                        let proto_req = request.unwrap().try_decode().unwrap();\n                  \
+                        let resp = inner.{4}(proto_req.input).await.unwrap();\n                  \
+                        let proto_resp = resp.try_encode();\n                  \
+                        let hyper_resp = proto_resp.unwrap().into_hyper();\n                  \
+                        Ok(hyper_resp)  \n               \
                     }}) \n           \
-                 }},", service.package, service.proto_name, method.proto_name, method.name));
+                 }},", service.package, service.proto_name, method.proto_name, self.generate_module_name(), method.name));
         }
         // Final 404 arm and end fn
         buf.push_str(&format!(
             "\n          \
                 _ => {{\n            \
                     Box::pin(async move {{ \n                \
-                        Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n            \
+                        Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_hyper_resp()) \n            \
                     }}) \n          \
                 }}\n       \
             }}\n   \
@@ -123,6 +170,7 @@ impl CodeGenerator {
 
 impl ServiceGenerator for CodeGenerator {
     fn generate(&mut self, service: Service, buf: &mut String) {
+        self.generate_uses(buf);
         self.generate_type_aliases(buf);
         self.generate_main_trait(&service, buf);
         self.generate_client_struct(&service, buf);
diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs
index 329eb39..225d4f6 100644
--- a/dubbo-build/src/lib.rs
+++ b/dubbo-build/src/lib.rs
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 #[cfg(feature = "generator-code")]
 extern crate prost_build;
diff --git a/examples/.DS_Store b/examples/.DS_Store
new file mode 100644
index 0000000..0d3b142
Binary files /dev/null and b/examples/.DS_Store differ
diff --git a/examples/grpc-gen/.DS_Store b/examples/grpc-gen/.DS_Store
new file mode 100644
index 0000000..b57983e
Binary files /dev/null and b/examples/grpc-gen/.DS_Store differ
diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml
index 335c81c..7122d68 100644
--- a/examples/grpc-gen/Cargo.toml
+++ b/examples/grpc-gen/Cargo.toml
@@ -6,13 +6,25 @@ edition = "2021"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+tokio = {version = "1.9.0", features = ["full"]}
 futures = "0.3.21"
 prost = "0.10.4"
 async-trait = { version = "0.1.56" }
 xds = { version = "0.1.0", path = "../../xds" }
-
 hyper = { version = "0.14.19", features = ["full"] }
+pin-project = "1.0.11"
+tower = "0.4"
+
+[lib]
+path = "./src/lib.rs"
+
+[[bin]]
+name="server"
+path="./src/server.rs"
 
+[[bin]]
+name="client"
+path="./src/client.rs"
 
 
 [build-dependencies]
diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs
index 0098c18..7a3e44f 100644
--- a/examples/grpc-gen/build.rs
+++ b/examples/grpc-gen/build.rs
@@ -3,10 +3,9 @@ use prost_build::Config;
 
 fn main() -> Result<()> {
     let mut conf = Config::new();
-    let mut gen = dubbo_build::CodeGenerator::new();
+    let gen = dubbo_build::CodeGenerator::new();
     conf.service_generator(Box::new(gen));
     conf.out_dir("src/");
     conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap();
-
     Ok(())
 }
\ No newline at end of file
diff --git a/examples/grpc-gen/src/.DS_Store b/examples/grpc-gen/src/.DS_Store
new file mode 100644
index 0000000..5008ddf
Binary files /dev/null and b/examples/grpc-gen/src/.DS_Store differ
diff --git a/examples/grpc-gen/src/client.rs b/examples/grpc-gen/src/client.rs
new file mode 100644
index 0000000..0f846a8
--- /dev/null
+++ b/examples/grpc-gen/src/client.rs
@@ -0,0 +1,10 @@
+
+mod greeter;
+use greeter::*;
+#[tokio::main]
+async fn main() {
+    let client =  GreeterClient::new("http://127.0.0.1:8972".to_owned());
+    let resp = client.say_hello(HelloRequest { name: "johankoi".into()}).await.unwrap();
+    let hello_resp = resp.output;
+    println!("{}",hello_resp.message);
+}
\ No newline at end of file
diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs
index 3ecb8f2..9895a76 100644
--- a/examples/grpc-gen/src/greeter.rs
+++ b/examples/grpc-gen/src/greeter.rs
@@ -9,63 +9,87 @@ pub struct HelloResponse {
     pub message: ::prost::alloc::string::String,
 }
 
-pub type DBReq<I> = xds::request::ServiceRequest<I>;
+use async_trait::async_trait;
+use tower::Service;
+use hyper::{
+    Body,
+    Request,
+    Response,
+};
+use futures::future::{
+    BoxFuture
+};
+use std::{
+    task::{Poll},
+};
+
 pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>;
 
-use async_trait::async_trait;
 
 #[async_trait]
 pub trait Greeter {
-   async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>;
+   async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse>;
 }
 
 pub struct GreeterClient {
-    pub hyper_client: xds::wrapper::HyperClient 
+    pub rpc_client: xds::client::RpcClient 
 }
 
 impl GreeterClient {
-     pub fn new(root_url: &str) -> GreeterClient {
+     pub fn new(addr: String) -> GreeterClient {
         GreeterClient {
-            hyper_client: xds::wrapper::HyperClient::new(root_url) 
+            rpc_client: xds::client::RpcClient::new(addr) 
         }
      }
 }
 
 #[async_trait]
 impl Greeter for GreeterClient {
-    async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> {
-        self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await
+    async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> {
+        let path = "dubbo/greeter.Greeter/SayHello".to_owned();
+        self.rpc_client.request(request, path).await
     }
 }
 
 pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> {
-    pub hyper_server: xds::wrapper::HyperServer<T> 
+    pub inner: T
 }
 
 impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> {
-    pub fn new(&self, service: T) -> GreeterServer<T> {
+    pub fn new(service: T) -> GreeterServer<T> {
         GreeterServer {
-            hyper_server: xds::wrapper::HyperServer::new(service) 
+            inner: service 
         }
      }
 }
 
-impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> {
-    fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> {
-       use ::futures::Future;
-       let trait_object_service = self.hyper_server.service.clone();
-       match (req.method.clone(), req.uri.path()) {
+impl<T> Service<Request<Body>> for GreeterServer<T> 
+    where T: 'static + Greeter + Send + Sync + Clone {
+    type Response = Response<Body>;
+    type Error = hyper::Error;
+    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+    
+    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+    
+    fn call(&mut self, req: Request<Body>) -> Self::Future {
+       let inner = self.inner.clone();
+       let url = req.uri().path();
+       match (req.method().clone(), url) {
           (::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => {
-              Box::pin(async move { 
-                  let proto_req = req.to_proto().unwrap();  
-                  let resp = trait_object_service.say_hello(proto_req).await.unwrap();  
-                  let proto_resp = resp.to_proto_raw();  
-                  proto_resp  
+              Box::pin(async move {
+                  let request = xds::ServiceRequest::try_from_hyper(req).await;
+                  let proto_req = request.unwrap().try_decode().unwrap();
+                  let resp = inner.say_hello(proto_req.input).await.unwrap();
+                  let proto_resp = resp.try_encode();
+                  let hyper_resp = proto_resp.unwrap().into_hyper();
+                  Ok(hyper_resp)  
                }) 
            },
           _ => {
             Box::pin(async move { 
-                Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw()) 
+                Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_hyper_resp()) 
             }) 
           }
        }
diff --git a/examples/grpc-gen/src/lib.rs b/examples/grpc-gen/src/lib.rs
new file mode 100644
index 0000000..c032559
--- /dev/null
+++ b/examples/grpc-gen/src/lib.rs
@@ -0,0 +1,6 @@
+
+
+fn main() {
+
+
+}
diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs
deleted file mode 100644
index 68b341a..0000000
--- a/examples/grpc-gen/src/main.rs
+++ /dev/null
@@ -1,12 +0,0 @@
-
-mod greeter;
-use greeter::*;
-
-
-fn main() {
-
-    let client =  GreeterClient::new("0.0.0.0:8080");
-    let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()});
-    client.say_hello(hello_req);
-
-}
diff --git a/examples/grpc-gen/src/server.rs b/examples/grpc-gen/src/server.rs
new file mode 100644
index 0000000..3c15554
--- /dev/null
+++ b/examples/grpc-gen/src/server.rs
@@ -0,0 +1,38 @@
+use std::convert::Infallible;
+use std::net::SocketAddr;
+use async_trait::async_trait;
+use hyper::{Server as hyper_server};
+use hyper::service::{make_service_fn};
+
+mod greeter;
+use greeter::*;
+use xds::ServiceResponse;
+
+
+#[derive(Default, Clone)]
+pub struct HelloService {}
+
+#[async_trait]
+impl Greeter for HelloService {
+    async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> {
+        println!("{}",request.name);
+        Ok(ServiceResponse::new(HelloResponse { message: "hello, dubbo rust!".into() }))
+    }
+}
+
+
+#[tokio::main]
+async fn main() {
+    let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
+    let make_service = make_service_fn(|_conn| async {
+        let server = GreeterServer::new(HelloService::default());
+        Ok::<_, Infallible>(server)
+    });
+
+    let server = hyper_server::bind(&addr).serve(make_service);
+    println!("Listening on http://{}", &addr);
+    if let Err(e) = server.await {
+        eprintln!("server error: {}", e);
+    }
+}
+
diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs
index 959e687..5e94390 100644
--- a/examples/protobuf-transport/src/lib.rs
+++ b/examples/protobuf-transport/src/lib.rs
@@ -3,5 +3,5 @@ pub mod person;
 pub use person::*;
 
 fn main() {
-    println!("Hello, world!");
+
 }
diff --git a/xds/.DS_Store b/xds/.DS_Store
new file mode 100644
index 0000000..b8d457d
Binary files /dev/null and b/xds/.DS_Store differ
diff --git a/xds/src/.DS_Store b/xds/src/.DS_Store
new file mode 100644
index 0000000..05fc01e
Binary files /dev/null and b/xds/src/.DS_Store differ
diff --git a/xds/src/client/client.rs b/xds/src/client/client.rs
index 1387e36..3c0b655 100644
--- a/xds/src/client/client.rs
+++ b/xds/src/client/client.rs
@@ -21,8 +21,21 @@ use hyper::client::service::Connect;
 use hyper::service::Service;
 use hyper::{Body, Request, Response};
 
-use crate::protocol::message::*;
+use crate::protocol::message::{
+    Message as DubboMessage,
+    RpcxMessage,
+    Metadata,
+    MessageType,
+    CompressType,
+    SerializeType
+};
+
 use std::collections::HashMap;
+use prost::{Message};
+
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::error::*;
 
 
 pub struct RpcClient {
@@ -36,6 +49,23 @@ impl RpcClient {
         }
     }
 
+    /// Invoke the given request for the given path and return a result of Result<ServiceResponse<O>, DBProstError>
+    pub async fn request<I, O>(&self, message: I, path: String) -> Result<ServiceResponse<O>, DBProstError>
+        where I: Message + Default + 'static,
+              O: Message + Default + 'static {
+
+        let url_str = format!("{}/{}", self.addr.as_str(), path.as_str());
+        let uri = url_str.parse::<hyper::Uri>().unwrap();
+
+        let req = ServiceRequest::new(message, uri.clone());
+        let hyper_req = req.into_encoded_hyper().unwrap();
+
+        let mut connect = Connect::new(HttpConnector::new(), Builder::new());
+        let mut send = connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
+        let hyper_resp = send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
+        ServiceResponse::decode_response(hyper_resp).await
+    }
+
     pub async fn call(
         &mut self,
         service_path: String,
@@ -44,7 +74,7 @@ impl RpcClient {
         payload: Vec<u8>
     ) -> std::result::Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
     {
-        let mut req = Message::new();
+        let mut req = DubboMessage::new();
         req.set_version(0);
         req.set_message_type(MessageType::Request);
         req.set_serialize_type(SerializeType::Protobuf);
diff --git a/xds/src/error.rs b/xds/src/error.rs
index e212835..2748a2e 100644
--- a/xds/src/error.rs
+++ b/xds/src/error.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use serde_json;
 use prost::{DecodeError, EncodeError};
 use hyper::{Body, Response, Method, Version, header, StatusCode};
diff --git a/xds/src/lib.rs b/xds/src/lib.rs
index d3a7e94..ba2a6b5 100644
--- a/xds/src/lib.rs
+++ b/xds/src/lib.rs
@@ -22,13 +22,10 @@ pub mod protocol;
 
 pub mod request;
 pub mod response;
-pub mod wrapper;
 pub mod error;
 pub mod util;
 
 
-
-pub use wrapper::*;
 pub use request::*;
 pub use response::*;
 pub use error::*;
diff --git a/xds/src/protocol/error.rs b/xds/src/protocol/error.rs
index c1cdd8f..1496243 100644
--- a/xds/src/protocol/error.rs
+++ b/xds/src/protocol/error.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use std::{convert::From, error, fmt, result, str};
 
 pub type Result<T> = result::Result<T, Error>;
diff --git a/xds/src/protocol/message.rs b/xds/src/protocol/message.rs
index 1895172..52bb806 100644
--- a/xds/src/protocol/message.rs
+++ b/xds/src/protocol/message.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use byteorder::{BigEndian, ByteOrder};
 use enum_primitive_derive::Primitive;
 use flate2::{read::GzDecoder, write::GzEncoder, Compression};
diff --git a/xds/src/protocol/mod.rs b/xds/src/protocol/mod.rs
index 1d650a8..b9eaa8f 100644
--- a/xds/src/protocol/mod.rs
+++ b/xds/src/protocol/mod.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 pub mod error;
 pub mod message;
 
diff --git a/xds/src/request.rs b/xds/src/request.rs
index 4d497c0..c27aa91 100644
--- a/xds/src/request.rs
+++ b/xds/src/request.rs
@@ -1,6 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::convert::From;
 use hyper::{body, Body, Request, Uri, Method, Version, header};
 use prost::{Message};
-use crate::util::*;
 use crate::error::*;
 
 /// A request with HTTP info and the serialized input object
@@ -22,15 +39,14 @@ pub struct ServiceRequest<T> {
     pub input: T,
 }
 
-impl<T> ServiceRequest<T> {
+impl<T: Message + Default + 'static> ServiceRequest<T> {
     /// Create new service request with the given input object
-    ///
     /// This automatically sets the `Content-Type` header as `application/protobuf`.
-    pub fn new(input: T) -> ServiceRequest<T> {
+    pub fn new(input: T, uri: Uri) -> ServiceRequest<T> {
         let mut header = header::HeaderMap::new();
         header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
         ServiceRequest {
-            uri: Default::default(),
+            uri,
             method: Method::POST,
             version: Version::default(),
             headers: header,
@@ -38,41 +54,41 @@ impl<T> ServiceRequest<T> {
         }
     }
 
-    /// Copy this request with a different input value
-    pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> {
-        ServiceRequest {
-            uri: self.uri.clone(),
-            method: self.method.clone(),
-            version: self.version,
-            headers: self.headers.clone(),
-            input,
-        }
+    /// Turn a protobuf service request into a byte-array service request
+    pub fn try_encode(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+        let mut body = Vec::new();
+        self.input.encode(&mut body)
+            .map(|v|
+                ServiceRequest {
+                    uri: self.uri.clone(),
+                    method: self.method.clone(),
+                    version: self.version,
+                    headers: self.headers.clone(),
+                    input: body,
+                })
+            .map_err(|e| DBProstError::ProstEncodeError(e))
     }
-}
 
-impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> {
-    fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) }
+    /// Turn a protobuf service request into a hyper request
+    pub fn into_encoded_hyper(&self) -> Result<Request<Body>, DBProstError> {
+        self.try_encode().map(|v| v.into_hyper())
+    }
 }
 
 impl ServiceRequest<Vec<u8>> {
-    /// Turn a hyper request to a boxed future of a byte-array service request
-    pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> {
-        Box::pin(async move {
-            let uri = req.uri().clone();
-            let method = req.method().clone();
-            let version = req.version();
-            let headers = req.headers().clone();
-            let future_req = body::to_bytes(req.into_body()).await
-                .map_err(DBProstError::HyperError)
-                .map(move |body|
-                    ServiceRequest { uri, method, version, headers, input: body.to_vec() }
-                );
-            future_req
-        })
+    /// Turn a hyper request to ServiceRequest<Vec<u8>>
+    pub async fn try_from_hyper(req: Request<Body>) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+        let uri = req.uri().clone();
+        let method = req.method().clone();
+        let version = req.version();
+        let headers = req.headers().clone();
+        body::to_bytes(req.into_body()).await
+            .map_err(DBProstError::HyperError)
+            .map(move |body| ServiceRequest { uri, method, version, headers, input: body.to_vec() })
     }
 
-    /// Turn a byte-array service request into a hyper request
-    pub fn to_hyper_raw(&self) -> Request<Body> {
+    /// Turn ServiceRequest<Vec<u8>> into a hyper request
+    pub fn into_hyper(&self) -> Request<Body> {
         let mut request = Request::builder()
             .method(self.method.clone())
             .uri(self.uri.clone())
@@ -85,47 +101,26 @@ impl ServiceRequest<Vec<u8>> {
         request
     }
 
-    /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error
-    pub fn body_err(&self, err: DBProstError) -> DBProstError {
-        DBProstError::AfterBodyError {
-            body: self.input.clone(),
-            method: Some(self.method.clone()),
-            version: self.version,
-            headers: self.headers.clone(),
-            status: None,
-            err: Box::new(err),
-        }
-    }
-
-    /// Serialize the byte-array service request into a protobuf service request
-    pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
-        match T::decode(self.input.as_ref()) {
-            Ok(v) => Ok(self.clone_with_input(v)),
-            Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
-        }
+    /// try_decode ServiceRequest<Vec<u8>> whose input is protobuf data into a ServiceRequest<T>
+    pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
+        T::decode(self.input.as_ref())
+            .map(|v|
+                ServiceRequest {
+                    uri: self.uri.clone(),
+                    method: self.method.clone(),
+                    version: self.version,
+                    headers: self.headers.clone(),
+                    input:v,
+                })
+            .map_err(|e|
+                DBProstError::AfterBodyError {
+                    body: self.input.clone(),
+                    method: Some(self.method.clone()),
+                    version: self.version,
+                    headers: self.headers.clone(),
+                    status: None,
+                    err: Box::new(DBProstError::ProstDecodeError(e)),
+                })
     }
 }
 
-impl<T: Message + Default + 'static> ServiceRequest<T> {
-    /// Turn a protobuf service request into a byte-array service request
-    pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
-        let mut body = Vec::new();
-        if let Err(err) = self.input.encode(&mut body) {
-            Err(DBProstError::ProstEncodeError(err))
-        } else {
-            Ok(self.clone_with_input(body))
-        }
-    }
-
-    /// Turn a hyper request into a protobuf service request
-    pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> {
-        Box::pin(async move {
-            ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto())
-        })
-    }
-
-    /// Turn a protobuf service request into a hyper request
-    pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> {
-        self.to_proto_raw().map(|v| v.to_hyper_raw())
-    }
-}
\ No newline at end of file
diff --git a/xds/src/response.rs b/xds/src/response.rs
index b38db8b..96f02e6 100644
--- a/xds/src/response.rs
+++ b/xds/src/response.rs
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use hyper::{body, Body, Response, Version, header, StatusCode};
 use prost::{Message};
-use crate::util::*;
 use crate::error::*;
 
 
@@ -41,28 +57,23 @@ impl<T> ServiceResponse<T> {
     }
 }
 
-impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
-    fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
-}
+// impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
+//     fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
+// }
 
 impl ServiceResponse<Vec<u8>> {
     /// Turn a hyper response to a boxed future of a byte-array service response
-    pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> {
-        Box::pin(async move {
-            let version = resp.version();
-            let headers = resp.headers().clone();
-            let status = resp.status().clone();
-            let future_resp = body::to_bytes(resp.into_body()).await
-                .map_err(DBProstError::HyperError)
-                .map(move |body|
-                    ServiceResponse { version, headers, status, output: body.to_vec() }
-                );
-            future_resp
-        })
+    pub async fn try_from_hyper(resp: Response<Body>) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+        let version = resp.version();
+        let headers = resp.headers().clone();
+        let status = resp.status().clone();
+        body::to_bytes(resp.into_body()).await
+            .map_err(DBProstError::HyperError)
+            .map(move |body| ServiceResponse { version, headers, status, output: body.to_vec() })
     }
 
     /// Turn a byte-array service response into a hyper response
-    pub fn to_hyper_raw(&self) -> Response<Body> {
+    pub fn into_hyper(&self) -> Response<Body> {
         let mut resp = Response::builder()
             .status(StatusCode::OK)
             .body(Body::from(self.output.clone())).unwrap();
@@ -74,58 +85,55 @@ impl ServiceResponse<Vec<u8>> {
         resp
     }
 
-    /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error
-    pub fn body_err(&self, err: DBProstError) -> DBProstError {
-        DBProstError::AfterBodyError {
-            body: self.output.clone(),
-            method: None,
-            version: self.version,
-            headers: self.headers.clone(),
-            status: Some(self.status),
-            err: Box::new(err),
-        }
-    }
-
-
     /// Serialize the byte-array service response into a protobuf service response
-    pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
+    pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
         if self.status.is_success() {
-            match T::decode(self.output.as_ref()) {
-                Ok(v) => Ok(self.clone_with_output(v)),
-                Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
-            }
+            T::decode(self.output.as_ref())
+                .map(|v|
+                    ServiceResponse {
+                        version: self.version,
+                        headers: self.headers.clone(),
+                        status: self.status.clone(),
+                        output: v,
+                    })
+                .map_err(|e|
+                    DBProstError::AfterBodyError {
+                        body: self.output.clone(),
+                        method: None,
+                        version: self.version,
+                        headers: self.headers.clone(),
+                        status: Some(self.status),
+                        err: Box::new(DBProstError::ProstDecodeError(e)),
+                    })
         } else {
-            match DBError::from_json_bytes(self.status, &self.output) {
-                Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))),
-                Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err)))
-            }
+            let err= DBError::new(self.status.clone(), "internal_err", "Internal Error");
+            Err(DBProstError::DBWrapError(err))
         }
     }
 }
 
 impl<T: Message + Default + 'static> ServiceResponse<T> {
-    /// Turn a protobuf service response into a byte-array service response
-    pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+    pub fn try_encode(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
         let mut body = Vec::new();
-        if let Err(err) = self.output.encode(&mut body) {
-            Err(DBProstError::ProstEncodeError(err))
-        } else {
-            Ok(self.clone_with_output(body))
-        }
+        self.output.encode(&mut body)
+            .map(|v|
+                ServiceResponse {
+                    version: self.version,
+                    headers: self.headers.clone(),
+                    status: self.status.clone(),
+                    output: body,
+                })
+            .map_err(|e| DBProstError::ProstEncodeError(e))
     }
 
-
     /// Turn a hyper response into a protobuf service response
-    pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
-        // Box::pin(async move {
-        ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto())
-        // })
+    pub async fn decode_response(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
+        ServiceResponse::try_from_hyper(resp).await.and_then(|v| v.try_decode())
     }
 
-
     /// Turn a protobuf service response into a hyper response
-    pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> {
-        self.to_proto_raw().map(|v| v.to_hyper_raw())
+    pub fn into_encoded_hyper(&self) -> Result<Response<Body>, DBProstError> {
+        self.try_encode().map(|v| v.into_hyper())
     }
 }
 
diff --git a/xds/src/server/.DS_Store b/xds/src/server/.DS_Store
new file mode 100644
index 0000000..5008ddf
Binary files /dev/null and b/xds/src/server/.DS_Store differ
diff --git a/xds/src/server/server.rs b/xds/src/server/server.rs
index dd8930a..f27bc0a 100644
--- a/xds/src/server/server.rs
+++ b/xds/src/server/server.rs
@@ -16,14 +16,11 @@
  */
 
 use std::convert::Infallible;
-use std::env;
-use std::process::Output;
 use std::task::Poll;
-use hyper::client::service;
 use hyper::{Body, Request, Response, Server as hyper_server};
-use hyper::service::{make_service_fn, service_fn};
+use hyper::service::{ make_service_fn, service_fn };
 use tower::Service;
-use futures::future::{ ready, Ready, BoxFuture};
+use futures::future::{ ready, BoxFuture };
 use std::net::SocketAddr;
 use crate::protocol::message::*;
 
@@ -38,8 +35,16 @@ impl RpcServer {
         }
     }
 
-    pub async fn start(&self) {
+    pub async fn start<S>(&self, service: S)
+        where
+            S: Service<Request<Body>, Response = Response<Body>, Error = Infallible>
+            + Clone
+            + Send
+            + 'static,
+            S::Future: Send + 'static,
+    {
         env_logger::init();
+
         let make_service = make_service_fn(|_conn| async {
             let svc = RPCHandler;
             Ok::<_, Infallible>(svc)
diff --git a/xds/src/util.rs b/xds/src/util.rs
index 2fbe3f3..39374fb 100644
--- a/xds/src/util.rs
+++ b/xds/src/util.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 use futures::future::BoxFuture;
 use crate::request::ServiceRequest;
 use crate::response::ServiceResponse;
diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs
deleted file mode 100644
index b80ffa8..0000000
--- a/xds/src/wrapper.rs
+++ /dev/null
@@ -1,110 +0,0 @@
-use std::sync::Arc;
-use std::task::Poll;
-use futures::future::BoxFuture;
-
-use hyper::{Body, Request, Response, header, StatusCode};
-
-use hyper::client::conn::Builder;
-use hyper::client::connect::HttpConnector;
-use hyper::client::service::Connect;
-
-use tower::Service;
-use prost::{Message};
-
-use crate::request::ServiceRequest;
-use crate::response::ServiceResponse;
-use crate::util::*;
-use crate::error::*;
-
-
-
-/// A wrapper for a hyper client
-#[derive(Debug)]
-pub struct HyperClient {
-    /// The hyper client
-    /// The root URL without any path attached
-    pub root_url: String,
-}
-
-impl HyperClient {
-    /// Create a new client wrapper for the given client and root using protobuf
-    pub fn new(root_url: &str) -> HyperClient {
-        HyperClient {
-            root_url: root_url.trim_end_matches('/').to_string(),
-        }
-    }
-
-    // Invoke the given request for the given path and return a boxed future result
-    pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError>
-        where I: Message + Default + 'static, O: Message + Default + 'static {
-        let hyper_req = req.to_hyper_proto().unwrap();
-        let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new());
-
-        let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/'));
-        let uri = url_string.parse::<hyper::Uri>().unwrap();
-
-        let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
-
-        let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
-        ServiceResponse::from_hyper_proto(hyper_resp).await
-    }
-}
-
-/// Service for taking a raw service request and returning a boxed future of a raw service response
-pub trait HyperService {
-    /// Accept a raw service request and return a boxed future of a raw service response
-    fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>;
-}
-
-/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service
-pub struct HyperServer<T> {
-    /// The `Arc` version of the service
-    ///
-    /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9)
-    pub service: Arc<T>,
-}
-
-impl<T> HyperServer<T> {
-    /// Create a new service wrapper for the given impl
-    pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } }
-}
-
-impl<T> Service<Request<Body>> for HyperServer<T>
-    where T: 'static + Send + Sync + HyperService
-{
-    type Response = Response<Body>;
-    type Error = hyper::Error;
-    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
-
-    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn call(&mut self, req: Request<Body>) -> Self::Future {
-        let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap();
-        if content_type == "application/protobuf" {
-            Box::pin(async move {
-                let status = StatusCode::UNSUPPORTED_MEDIA_TYPE;
-                Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp())
-            })
-        } else {
-            let service = self.service.clone();
-            Box::pin(async move {
-                let request = ServiceRequest::from_hyper_raw(req).await;
-                let resp = service.handle(request.unwrap()).await;
-                resp.map(|v| v.to_hyper_raw())
-                    .or_else(|err| match err.root_err() {
-                        DBProstError::ProstDecodeError(_) =>
-                            Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body").
-                                to_hyper_resp()),
-                        DBProstError::DBWrapError(err) =>
-                            Ok(err.to_hyper_resp()),
-                        DBProstError::HyperError(err) =>
-                            Err(err),
-                        _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error").
-                            to_hyper_resp()),
-                    })
-            })
-        }
-    }
-}
\ No newline at end of file