You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/07/23 14:44:52 UTC
[dubbo-rust] 29/35: finished 1.0.0
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch poc-idl
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
commit 60bfedf3134569b97dd7648cc747ebb05476250c
Author: hxq <jo...@163.com>
AuthorDate: Thu Jul 21 16:25:11 2022 +0800
finished 1.0.0
---
.github/.DS_Store | Bin 0 -> 6148 bytes
dubbo-build/.DS_Store | Bin 0 -> 6148 bytes
dubbo-build/src/generator.rs | 106 +++++++++++++++++-------
dubbo-build/src/lib.rs | 17 +++-
examples/.DS_Store | Bin 0 -> 6148 bytes
examples/grpc-gen/.DS_Store | Bin 0 -> 6148 bytes
examples/grpc-gen/Cargo.toml | 14 +++-
examples/grpc-gen/build.rs | 3 +-
examples/grpc-gen/src/.DS_Store | Bin 0 -> 6148 bytes
examples/grpc-gen/src/client.rs | 10 +++
examples/grpc-gen/src/greeter.rs | 68 +++++++++++-----
examples/grpc-gen/src/lib.rs | 6 ++
examples/grpc-gen/src/main.rs | 12 ---
examples/grpc-gen/src/server.rs | 38 +++++++++
examples/protobuf-transport/src/lib.rs | 2 +-
xds/.DS_Store | Bin 0 -> 6148 bytes
xds/src/.DS_Store | Bin 0 -> 6148 bytes
xds/src/client/client.rs | 34 +++++++-
xds/src/error.rs | 17 ++++
xds/src/lib.rs | 3 -
xds/src/protocol/error.rs | 17 ++++
xds/src/protocol/message.rs | 17 ++++
xds/src/protocol/mod.rs | 17 ++++
xds/src/request.rs | 143 ++++++++++++++++-----------------
xds/src/response.rs | 116 +++++++++++++-------------
xds/src/server/.DS_Store | Bin 0 -> 6148 bytes
xds/src/server/server.rs | 17 ++--
xds/src/util.rs | 17 ++++
xds/src/wrapper.rs | 110 -------------------------
29 files changed, 467 insertions(+), 317 deletions(-)
diff --git a/.github/.DS_Store b/.github/.DS_Store
new file mode 100644
index 0000000..3bb81dc
Binary files /dev/null and b/.github/.DS_Store differ
diff --git a/dubbo-build/.DS_Store b/dubbo-build/.DS_Store
new file mode 100644
index 0000000..cd16f51
Binary files /dev/null and b/dubbo-build/.DS_Store differ
diff --git a/dubbo-build/src/generator.rs b/dubbo-build/src/generator.rs
index 05e25cf..79fa172 100644
--- a/dubbo-build/src/generator.rs
+++ b/dubbo-build/src/generator.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use prost_build::{Method, Service, ServiceGenerator};
#[derive(Default)]
@@ -8,17 +25,32 @@ impl CodeGenerator {
fn generate_module_name(&self) -> &str { "xds" }
- fn generate_type_aliases(&mut self, buf: &mut String) {
+ fn generate_uses(&mut self, buf: &mut String) {
buf.push_str(&format!(
"\n\
- pub type DBReq<I> = {0}::request::ServiceRequest<I>;\n\
- pub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
+ use async_trait::async_trait;\n\
+ use tower::Service;\n\
+ use hyper::{{\n \
+ Body,\n \
+ Request,\n \
+ Response,\n\
+ }};\n\
+ use futures::future::{{\n \
+ BoxFuture\n\
+ }};\n\
+ use std::{{\n \
+ task::{{Poll}},\n\
+ }};\n"));
+ }
+
+ fn generate_type_aliases(&mut self, buf: &mut String) {
+ buf.push_str(&format!(
+ "\npub type DBResp<O> = Result<{0}::response::ServiceResponse<O>, {0}::error::DBProstError>;\n",
self.generate_module_name()));
}
fn generate_main_trait(&self, service: &Service, buf: &mut String) {
- buf.push_str("\nuse async_trait::async_trait;\n\n#[async_trait]\n");
-
+ buf.push_str("\n\n#[async_trait]\n");
service.comments.append_with_indent(0, buf);
buf.push_str(&format!("pub trait {} {{", service.name));
for method in service.methods.iter() {
@@ -30,7 +62,7 @@ impl CodeGenerator {
}
fn method_sig(&self, method: &Method) -> String {
- format!("async fn {0}(&self, request: DBReq<{1}>) -> DBResp<{2}>",
+ format!("async fn {0}(&self, request: {1}) -> DBResp<{2}>",
method.name, method.input_type, method.output_type)
}
@@ -38,7 +70,7 @@ impl CodeGenerator {
buf.push_str(&format!(
"\n\
pub struct {0}Client {{\n \
- pub hyper_client: {1}::wrapper::HyperClient \n\
+ pub rpc_client: {1}::client::RpcClient \n\
}}\n",service.name, self.generate_module_name()));
}
@@ -46,9 +78,9 @@ impl CodeGenerator {
buf.push_str(&format!(
"\n\
impl {0}Client {{\n \
- pub fn new(root_url: &str) -> {0}Client {{\n \
+ pub fn new(addr: String) -> {0}Client {{\n \
{0}Client {{\n \
- hyper_client: {1}::wrapper::HyperClient::new(root_url) \n \
+ rpc_client: {1}::client::RpcClient::new(addr) \n \
}}\n \
}}\n\
}}\n", service.name, self.generate_module_name()));
@@ -58,8 +90,9 @@ impl CodeGenerator {
for method in service.methods.iter() {
buf.push_str(&format!(
- "\n {} {{\n \
- self.hyper_client.request(\"/dubbo/{}.{}/{}\", request).await\n \
+ "\n {0} {{\n \
+ let path = \"dubbo/{1}.{2}/{3}\".to_owned();\n \
+ self.rpc_client.request(request, path).await\n \
}}\n", self.method_sig(method), service.package, service.proto_name, method.proto_name));
}
buf.push_str("}\n");
@@ -69,50 +102,64 @@ impl CodeGenerator {
buf.push_str(&format!(
"\n\
pub struct {0}Server<T: 'static + {0} + Send + Sync + Clone> {{\n \
- pub hyper_server: {1}::wrapper::HyperServer<T> \n\
- }}\n",service.name, self.generate_module_name()));
+ pub inner: T\n\
+ }}\n",service.name));
}
fn generate_server_impl(&self, service: &Service, buf: &mut String) {
buf.push_str(&format!(
"\n\
impl<T: 'static + {0} + Send + Sync + Clone> {0}Server<T> {{\n \
- pub fn new(&self, service: T) -> {0}Server<T> {{\n \
+ pub fn new(service: T) -> {0}Server<T> {{\n \
{0}Server {{\n \
- hyper_server: {1}::wrapper::HyperServer::new(service) \n \
+ inner: service \n \
}}\n \
}}\n\
}}\n",
- service.name, self.generate_module_name()));
+ service.name));
buf.push_str(&format!(
"\n\
- impl<T: 'static + {0} + Send + Sync + Clone> {1}::wrapper::HyperService for {0}Server<T> {{\n \
- fn handle(&self, req: DBReq<Vec<u8>>) -> {1}::BoxFutureResp<Vec<u8>> {{\n \
- use ::futures::Future;\n \
- let trait_object_service = self.hyper_server.service.clone();\n \
- match (req.method.clone(), req.uri.path()) {{",
- service.name, self.generate_module_name()));
+ impl<T> Service<Request<Body>> for {0}Server<T> \n \
+ where T: 'static + Greeter + Send + Sync + Clone {{\n \
+ type Response = Response<Body>;\n \
+ type Error = hyper::Error;\n \
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;\n \
+ \n \
+
+ fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {{\n \
+ Poll::Ready(Ok(()))\n \
+ }}\n \
+
+ \n \
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {{\n \
+ let inner = self.inner.clone();\n \
+ let url = req.uri().path();\n \
+ match (req.method().clone(), url) {{",
+ service.name));
// Make match arms for each type
for method in service.methods.iter() {
buf.push_str(&format!(
"\n \
(::hyper::Method::POST, \"/dubbo/{0}.{1}/{2}\") => {{\n \
- Box::pin(async move {{ \n \
- let proto_req = req.to_proto().unwrap(); \n \
- let resp = trait_object_service.{3}(proto_req).await.unwrap(); \n \
- let proto_resp = resp.to_proto_raw(); \n \
- proto_resp \n \
+ Box::pin(async move {{\n \
+ let request = {3}::ServiceRequest::try_from_hyper(req).await;\n \
+ let proto_req = request.unwrap().try_decode().unwrap();\n \
+ let resp = inner.{4}(proto_req.input).await.unwrap();\n \
+ let proto_resp = resp.try_encode();\n \
+ let hyper_resp = proto_resp.unwrap().into_hyper();\n \
+ Ok(hyper_resp) \n \
}}) \n \
- }},", service.package, service.proto_name, method.proto_name, method.name));
+ }},", service.package, service.proto_name, method.proto_name, self.generate_module_name(), method.name));
}
// Final 404 arm and end fn
buf.push_str(&format!(
"\n \
_ => {{\n \
Box::pin(async move {{ \n \
- Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_resp_raw()) \n \
+ Ok({0}::error::DBError::new(::hyper::StatusCode::NOT_FOUND, \"not_found\", \"Not found\").to_hyper_resp()) \n \
}}) \n \
}}\n \
}}\n \
@@ -123,6 +170,7 @@ impl CodeGenerator {
impl ServiceGenerator for CodeGenerator {
fn generate(&mut self, service: Service, buf: &mut String) {
+ self.generate_uses(buf);
self.generate_type_aliases(buf);
self.generate_main_trait(&service, buf);
self.generate_client_struct(&service, buf);
diff --git a/dubbo-build/src/lib.rs b/dubbo-build/src/lib.rs
index 329eb39..225d4f6 100644
--- a/dubbo-build/src/lib.rs
+++ b/dubbo-build/src/lib.rs
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#[cfg(feature = "generator-code")]
extern crate prost_build;
diff --git a/examples/.DS_Store b/examples/.DS_Store
new file mode 100644
index 0000000..0d3b142
Binary files /dev/null and b/examples/.DS_Store differ
diff --git a/examples/grpc-gen/.DS_Store b/examples/grpc-gen/.DS_Store
new file mode 100644
index 0000000..b57983e
Binary files /dev/null and b/examples/grpc-gen/.DS_Store differ
diff --git a/examples/grpc-gen/Cargo.toml b/examples/grpc-gen/Cargo.toml
index 335c81c..7122d68 100644
--- a/examples/grpc-gen/Cargo.toml
+++ b/examples/grpc-gen/Cargo.toml
@@ -6,13 +6,25 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+tokio = {version = "1.9.0", features = ["full"]}
futures = "0.3.21"
prost = "0.10.4"
async-trait = { version = "0.1.56" }
xds = { version = "0.1.0", path = "../../xds" }
-
hyper = { version = "0.14.19", features = ["full"] }
+pin-project = "1.0.11"
+tower = "0.4"
+
+[lib]
+path = "./src/lib.rs"
+
+[[bin]]
+name="server"
+path="./src/server.rs"
+[[bin]]
+name="client"
+path="./src/client.rs"
[build-dependencies]
diff --git a/examples/grpc-gen/build.rs b/examples/grpc-gen/build.rs
index 0098c18..7a3e44f 100644
--- a/examples/grpc-gen/build.rs
+++ b/examples/grpc-gen/build.rs
@@ -3,10 +3,9 @@ use prost_build::Config;
fn main() -> Result<()> {
let mut conf = Config::new();
- let mut gen = dubbo_build::CodeGenerator::new();
+ let gen = dubbo_build::CodeGenerator::new();
conf.service_generator(Box::new(gen));
conf.out_dir("src/");
conf.compile_protos(&["pb/greeter.proto"], &["pb/"]).unwrap();
-
Ok(())
}
\ No newline at end of file
diff --git a/examples/grpc-gen/src/.DS_Store b/examples/grpc-gen/src/.DS_Store
new file mode 100644
index 0000000..5008ddf
Binary files /dev/null and b/examples/grpc-gen/src/.DS_Store differ
diff --git a/examples/grpc-gen/src/client.rs b/examples/grpc-gen/src/client.rs
new file mode 100644
index 0000000..0f846a8
--- /dev/null
+++ b/examples/grpc-gen/src/client.rs
@@ -0,0 +1,10 @@
+
+mod greeter;
+use greeter::*;
+#[tokio::main]
+async fn main() {
+ let client = GreeterClient::new("http://127.0.0.1:8972".to_owned());
+ let resp = client.say_hello(HelloRequest { name: "johankoi".into()}).await.unwrap();
+ let hello_resp = resp.output;
+ println!("{}",hello_resp.message);
+}
\ No newline at end of file
diff --git a/examples/grpc-gen/src/greeter.rs b/examples/grpc-gen/src/greeter.rs
index 3ecb8f2..9895a76 100644
--- a/examples/grpc-gen/src/greeter.rs
+++ b/examples/grpc-gen/src/greeter.rs
@@ -9,63 +9,87 @@ pub struct HelloResponse {
pub message: ::prost::alloc::string::String,
}
-pub type DBReq<I> = xds::request::ServiceRequest<I>;
+use async_trait::async_trait;
+use tower::Service;
+use hyper::{
+ Body,
+ Request,
+ Response,
+};
+use futures::future::{
+ BoxFuture
+};
+use std::{
+ task::{Poll},
+};
+
pub type DBResp<O> = Result<xds::response::ServiceResponse<O>, xds::error::DBProstError>;
-use async_trait::async_trait;
#[async_trait]
pub trait Greeter {
- async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse>;
+ async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse>;
}
pub struct GreeterClient {
- pub hyper_client: xds::wrapper::HyperClient
+ pub rpc_client: xds::client::RpcClient
}
impl GreeterClient {
- pub fn new(root_url: &str) -> GreeterClient {
+ pub fn new(addr: String) -> GreeterClient {
GreeterClient {
- hyper_client: xds::wrapper::HyperClient::new(root_url)
+ rpc_client: xds::client::RpcClient::new(addr)
}
}
}
#[async_trait]
impl Greeter for GreeterClient {
- async fn say_hello(&self, request: DBReq<HelloRequest>) -> DBResp<HelloResponse> {
- self.hyper_client.request("/dubbo/greeter.Greeter/SayHello", request).await
+ async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> {
+ let path = "dubbo/greeter.Greeter/SayHello".to_owned();
+ self.rpc_client.request(request, path).await
}
}
pub struct GreeterServer<T: 'static + Greeter + Send + Sync + Clone> {
- pub hyper_server: xds::wrapper::HyperServer<T>
+ pub inner: T
}
impl<T: 'static + Greeter + Send + Sync + Clone> GreeterServer<T> {
- pub fn new(&self, service: T) -> GreeterServer<T> {
+ pub fn new(service: T) -> GreeterServer<T> {
GreeterServer {
- hyper_server: xds::wrapper::HyperServer::new(service)
+ inner: service
}
}
}
-impl<T: 'static + Greeter + Send + Sync + Clone> xds::wrapper::HyperService for GreeterServer<T> {
- fn handle(&self, req: DBReq<Vec<u8>>) -> xds::BoxFutureResp<Vec<u8>> {
- use ::futures::Future;
- let trait_object_service = self.hyper_server.service.clone();
- match (req.method.clone(), req.uri.path()) {
+impl<T> Service<Request<Body>> for GreeterServer<T>
+ where T: 'static + Greeter + Send + Sync + Clone {
+ type Response = Response<Body>;
+ type Error = hyper::Error;
+ type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
+
+ fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let inner = self.inner.clone();
+ let url = req.uri().path();
+ match (req.method().clone(), url) {
(::hyper::Method::POST, "/dubbo/greeter.Greeter/SayHello") => {
- Box::pin(async move {
- let proto_req = req.to_proto().unwrap();
- let resp = trait_object_service.say_hello(proto_req).await.unwrap();
- let proto_resp = resp.to_proto_raw();
- proto_resp
+ Box::pin(async move {
+ let request = xds::ServiceRequest::try_from_hyper(req).await;
+ let proto_req = request.unwrap().try_decode().unwrap();
+ let resp = inner.say_hello(proto_req.input).await.unwrap();
+ let proto_resp = resp.try_encode();
+ let hyper_resp = proto_resp.unwrap().into_hyper();
+ Ok(hyper_resp)
})
},
_ => {
Box::pin(async move {
- Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_resp_raw())
+ Ok(xds::error::DBError::new(::hyper::StatusCode::NOT_FOUND, "not_found", "Not found").to_hyper_resp())
})
}
}
diff --git a/examples/grpc-gen/src/lib.rs b/examples/grpc-gen/src/lib.rs
new file mode 100644
index 0000000..c032559
--- /dev/null
+++ b/examples/grpc-gen/src/lib.rs
@@ -0,0 +1,6 @@
+
+
+fn main() {
+
+
+}
diff --git a/examples/grpc-gen/src/main.rs b/examples/grpc-gen/src/main.rs
deleted file mode 100644
index 68b341a..0000000
--- a/examples/grpc-gen/src/main.rs
+++ /dev/null
@@ -1,12 +0,0 @@
-
-mod greeter;
-use greeter::*;
-
-
-fn main() {
-
- let client = GreeterClient::new("0.0.0.0:8080");
- let hello_req = DBReq::new(HelloRequest { name: "johankoi".into()});
- client.say_hello(hello_req);
-
-}
diff --git a/examples/grpc-gen/src/server.rs b/examples/grpc-gen/src/server.rs
new file mode 100644
index 0000000..3c15554
--- /dev/null
+++ b/examples/grpc-gen/src/server.rs
@@ -0,0 +1,38 @@
+use std::convert::Infallible;
+use std::net::SocketAddr;
+use async_trait::async_trait;
+use hyper::{Server as hyper_server};
+use hyper::service::{make_service_fn};
+
+mod greeter;
+use greeter::*;
+use xds::ServiceResponse;
+
+
+#[derive(Default, Clone)]
+pub struct HelloService {}
+
+#[async_trait]
+impl Greeter for HelloService {
+ async fn say_hello(&self, request: HelloRequest) -> DBResp<HelloResponse> {
+ println!("{}",request.name);
+ Ok(ServiceResponse::new(HelloResponse { message: "hello, dubbo rust!".into() }))
+ }
+}
+
+
+#[tokio::main]
+async fn main() {
+ let addr = SocketAddr::from(([127, 0, 0, 1], 8972));
+ let make_service = make_service_fn(|_conn| async {
+ let server = GreeterServer::new(HelloService::default());
+ Ok::<_, Infallible>(server)
+ });
+
+ let server = hyper_server::bind(&addr).serve(make_service);
+ println!("Listening on http://{}", &addr);
+ if let Err(e) = server.await {
+ eprintln!("server error: {}", e);
+ }
+}
+
diff --git a/examples/protobuf-transport/src/lib.rs b/examples/protobuf-transport/src/lib.rs
index 959e687..5e94390 100644
--- a/examples/protobuf-transport/src/lib.rs
+++ b/examples/protobuf-transport/src/lib.rs
@@ -3,5 +3,5 @@ pub mod person;
pub use person::*;
fn main() {
- println!("Hello, world!");
+
}
diff --git a/xds/.DS_Store b/xds/.DS_Store
new file mode 100644
index 0000000..b8d457d
Binary files /dev/null and b/xds/.DS_Store differ
diff --git a/xds/src/.DS_Store b/xds/src/.DS_Store
new file mode 100644
index 0000000..05fc01e
Binary files /dev/null and b/xds/src/.DS_Store differ
diff --git a/xds/src/client/client.rs b/xds/src/client/client.rs
index 1387e36..3c0b655 100644
--- a/xds/src/client/client.rs
+++ b/xds/src/client/client.rs
@@ -21,8 +21,21 @@ use hyper::client::service::Connect;
use hyper::service::Service;
use hyper::{Body, Request, Response};
-use crate::protocol::message::*;
+use crate::protocol::message::{
+ Message as DubboMessage,
+ RpcxMessage,
+ Metadata,
+ MessageType,
+ CompressType,
+ SerializeType
+};
+
use std::collections::HashMap;
+use prost::{Message};
+
+use crate::request::ServiceRequest;
+use crate::response::ServiceResponse;
+use crate::error::*;
pub struct RpcClient {
@@ -36,6 +49,23 @@ impl RpcClient {
}
}
+ /// Invoke the given request for the given path and return a result of Result<ServiceResponse<O>, DBProstError>
+ pub async fn request<I, O>(&self, message: I, path: String) -> Result<ServiceResponse<O>, DBProstError>
+ where I: Message + Default + 'static,
+ O: Message + Default + 'static {
+
+ let url_str = format!("{}/{}", self.addr.as_str(), path.as_str());
+ let uri = url_str.parse::<hyper::Uri>().unwrap();
+
+ let req = ServiceRequest::new(message, uri.clone());
+ let hyper_req = req.into_encoded_hyper().unwrap();
+
+ let mut connect = Connect::new(HttpConnector::new(), Builder::new());
+ let mut send = connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
+ let hyper_resp = send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
+ ServiceResponse::decode_response(hyper_resp).await
+ }
+
pub async fn call(
&mut self,
service_path: String,
@@ -44,7 +74,7 @@ impl RpcClient {
payload: Vec<u8>
) -> std::result::Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>>
{
- let mut req = Message::new();
+ let mut req = DubboMessage::new();
req.set_version(0);
req.set_message_type(MessageType::Request);
req.set_serialize_type(SerializeType::Protobuf);
diff --git a/xds/src/error.rs b/xds/src/error.rs
index e212835..2748a2e 100644
--- a/xds/src/error.rs
+++ b/xds/src/error.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use serde_json;
use prost::{DecodeError, EncodeError};
use hyper::{Body, Response, Method, Version, header, StatusCode};
diff --git a/xds/src/lib.rs b/xds/src/lib.rs
index d3a7e94..ba2a6b5 100644
--- a/xds/src/lib.rs
+++ b/xds/src/lib.rs
@@ -22,13 +22,10 @@ pub mod protocol;
pub mod request;
pub mod response;
-pub mod wrapper;
pub mod error;
pub mod util;
-
-pub use wrapper::*;
pub use request::*;
pub use response::*;
pub use error::*;
diff --git a/xds/src/protocol/error.rs b/xds/src/protocol/error.rs
index c1cdd8f..1496243 100644
--- a/xds/src/protocol/error.rs
+++ b/xds/src/protocol/error.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use std::{convert::From, error, fmt, result, str};
pub type Result<T> = result::Result<T, Error>;
diff --git a/xds/src/protocol/message.rs b/xds/src/protocol/message.rs
index 1895172..52bb806 100644
--- a/xds/src/protocol/message.rs
+++ b/xds/src/protocol/message.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use byteorder::{BigEndian, ByteOrder};
use enum_primitive_derive::Primitive;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
diff --git a/xds/src/protocol/mod.rs b/xds/src/protocol/mod.rs
index 1d650a8..b9eaa8f 100644
--- a/xds/src/protocol/mod.rs
+++ b/xds/src/protocol/mod.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
pub mod error;
pub mod message;
diff --git a/xds/src/request.rs b/xds/src/request.rs
index 4d497c0..c27aa91 100644
--- a/xds/src/request.rs
+++ b/xds/src/request.rs
@@ -1,6 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use std::convert::From;
use hyper::{body, Body, Request, Uri, Method, Version, header};
use prost::{Message};
-use crate::util::*;
use crate::error::*;
/// A request with HTTP info and the serialized input object
@@ -22,15 +39,14 @@ pub struct ServiceRequest<T> {
pub input: T,
}
-impl<T> ServiceRequest<T> {
+impl<T: Message + Default + 'static> ServiceRequest<T> {
/// Create new service request with the given input object
- ///
/// This automatically sets the `Content-Type` header as `application/protobuf`.
- pub fn new(input: T) -> ServiceRequest<T> {
+ pub fn new(input: T, uri: Uri) -> ServiceRequest<T> {
let mut header = header::HeaderMap::new();
header.insert(header::CONTENT_TYPE, "application/protobuf".parse().unwrap());
ServiceRequest {
- uri: Default::default(),
+ uri,
method: Method::POST,
version: Version::default(),
headers: header,
@@ -38,41 +54,41 @@ impl<T> ServiceRequest<T> {
}
}
- /// Copy this request with a different input value
- pub fn clone_with_input<U>(&self, input: U) -> ServiceRequest<U> {
- ServiceRequest {
- uri: self.uri.clone(),
- method: self.method.clone(),
- version: self.version,
- headers: self.headers.clone(),
- input,
- }
+ /// Turn a protobuf service request into a byte-array service request
+ pub fn try_encode(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+ let mut body = Vec::new();
+ self.input.encode(&mut body)
+ .map(|v|
+ ServiceRequest {
+ uri: self.uri.clone(),
+ method: self.method.clone(),
+ version: self.version,
+ headers: self.headers.clone(),
+ input: body,
+ })
+ .map_err(|e| DBProstError::ProstEncodeError(e))
}
-}
-impl<T: Message + Default + 'static> From<T> for ServiceRequest<T> {
- fn from(v: T) -> ServiceRequest<T> { ServiceRequest::new(v) }
+ /// Turn a protobuf service request into a hyper request
+ pub fn into_encoded_hyper(&self) -> Result<Request<Body>, DBProstError> {
+ self.try_encode().map(|v| v.into_hyper())
+ }
}
impl ServiceRequest<Vec<u8>> {
- /// Turn a hyper request to a boxed future of a byte-array service request
- pub fn from_hyper_raw(req: Request<Body>) -> BoxFutureReq<Vec<u8>> {
- Box::pin(async move {
- let uri = req.uri().clone();
- let method = req.method().clone();
- let version = req.version();
- let headers = req.headers().clone();
- let future_req = body::to_bytes(req.into_body()).await
- .map_err(DBProstError::HyperError)
- .map(move |body|
- ServiceRequest { uri, method, version, headers, input: body.to_vec() }
- );
- future_req
- })
+ /// Turn a hyper request to ServiceRequest<Vec<u8>>
+ pub async fn try_from_hyper(req: Request<Body>) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
+ let uri = req.uri().clone();
+ let method = req.method().clone();
+ let version = req.version();
+ let headers = req.headers().clone();
+ body::to_bytes(req.into_body()).await
+ .map_err(DBProstError::HyperError)
+ .map(move |body| ServiceRequest { uri, method, version, headers, input: body.to_vec() })
}
- /// Turn a byte-array service request into a hyper request
- pub fn to_hyper_raw(&self) -> Request<Body> {
+ /// Turn ServiceRequest<Vec<u8>> into a hyper request
+ pub fn into_hyper(&self) -> Request<Body> {
let mut request = Request::builder()
.method(self.method.clone())
.uri(self.uri.clone())
@@ -85,47 +101,26 @@ impl ServiceRequest<Vec<u8>> {
request
}
- /// Turn a byte-array service request into a `AfterBodyError`-wrapped version of the given error
- pub fn body_err(&self, err: DBProstError) -> DBProstError {
- DBProstError::AfterBodyError {
- body: self.input.clone(),
- method: Some(self.method.clone()),
- version: self.version,
- headers: self.headers.clone(),
- status: None,
- err: Box::new(err),
- }
- }
-
- /// Serialize the byte-array service request into a protobuf service request
- pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
- match T::decode(self.input.as_ref()) {
- Ok(v) => Ok(self.clone_with_input(v)),
- Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
- }
+ /// try_decode ServiceRequest<Vec<u8>> whose input is protobuf data into a ServiceRequest<T>
+ pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceRequest<T>, DBProstError> {
+ T::decode(self.input.as_ref())
+ .map(|v|
+ ServiceRequest {
+ uri: self.uri.clone(),
+ method: self.method.clone(),
+ version: self.version,
+ headers: self.headers.clone(),
+ input:v,
+ })
+ .map_err(|e|
+ DBProstError::AfterBodyError {
+ body: self.input.clone(),
+ method: Some(self.method.clone()),
+ version: self.version,
+ headers: self.headers.clone(),
+ status: None,
+ err: Box::new(DBProstError::ProstDecodeError(e)),
+ })
}
}
-impl<T: Message + Default + 'static> ServiceRequest<T> {
- /// Turn a protobuf service request into a byte-array service request
- pub fn to_proto_raw(&self) -> Result<ServiceRequest<Vec<u8>>, DBProstError> {
- let mut body = Vec::new();
- if let Err(err) = self.input.encode(&mut body) {
- Err(DBProstError::ProstEncodeError(err))
- } else {
- Ok(self.clone_with_input(body))
- }
- }
-
- /// Turn a hyper request into a protobuf service request
- pub fn from_hyper_proto(req: Request<Body>) -> BoxFutureReq<T> {
- Box::pin(async move {
- ServiceRequest::from_hyper_raw(req).await.and_then(|v| v.to_proto())
- })
- }
-
- /// Turn a protobuf service request into a hyper request
- pub fn to_hyper_proto(&self) -> Result<Request<Body>, DBProstError> {
- self.to_proto_raw().map(|v| v.to_hyper_raw())
- }
-}
\ No newline at end of file
diff --git a/xds/src/response.rs b/xds/src/response.rs
index b38db8b..96f02e6 100644
--- a/xds/src/response.rs
+++ b/xds/src/response.rs
@@ -1,6 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use hyper::{body, Body, Response, Version, header, StatusCode};
use prost::{Message};
-use crate::util::*;
use crate::error::*;
@@ -41,28 +57,23 @@ impl<T> ServiceResponse<T> {
}
}
-impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
- fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
-}
+// impl<T: Message + Default + 'static> From<T> for ServiceResponse<T> {
+// fn from(v: T) -> ServiceResponse<T> { ServiceResponse::new(v) }
+// }
impl ServiceResponse<Vec<u8>> {
/// Turn a hyper response to a boxed future of a byte-array service response
- pub fn from_hyper_raw(resp: Response<Body>) -> BoxFutureResp<Vec<u8>> {
- Box::pin(async move {
- let version = resp.version();
- let headers = resp.headers().clone();
- let status = resp.status().clone();
- let future_resp = body::to_bytes(resp.into_body()).await
- .map_err(DBProstError::HyperError)
- .map(move |body|
- ServiceResponse { version, headers, status, output: body.to_vec() }
- );
- future_resp
- })
+ pub async fn try_from_hyper(resp: Response<Body>) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+ let version = resp.version();
+ let headers = resp.headers().clone();
+ let status = resp.status().clone();
+ body::to_bytes(resp.into_body()).await
+ .map_err(DBProstError::HyperError)
+ .map(move |body| ServiceResponse { version, headers, status, output: body.to_vec() })
}
/// Turn a byte-array service response into a hyper response
- pub fn to_hyper_raw(&self) -> Response<Body> {
+ pub fn into_hyper(&self) -> Response<Body> {
let mut resp = Response::builder()
.status(StatusCode::OK)
.body(Body::from(self.output.clone())).unwrap();
@@ -74,58 +85,55 @@ impl ServiceResponse<Vec<u8>> {
resp
}
- /// Turn a byte-array service response into a `AfterBodyError`-wrapped version of the given error
- pub fn body_err(&self, err: DBProstError) -> DBProstError {
- DBProstError::AfterBodyError {
- body: self.output.clone(),
- method: None,
- version: self.version,
- headers: self.headers.clone(),
- status: Some(self.status),
- err: Box::new(err),
- }
- }
-
-
/// Serialize the byte-array service response into a protobuf service response
- pub fn to_proto<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
+ pub fn try_decode<T: Message + Default + 'static>(&self) -> Result<ServiceResponse<T>, DBProstError> {
if self.status.is_success() {
- match T::decode(self.output.as_ref()) {
- Ok(v) => Ok(self.clone_with_output(v)),
- Err(err) => Err(self.body_err(DBProstError::ProstDecodeError(err)))
- }
+ T::decode(self.output.as_ref())
+ .map(|v|
+ ServiceResponse {
+ version: self.version,
+ headers: self.headers.clone(),
+ status: self.status.clone(),
+ output: v,
+ })
+ .map_err(|e|
+ DBProstError::AfterBodyError {
+ body: self.output.clone(),
+ method: None,
+ version: self.version,
+ headers: self.headers.clone(),
+ status: Some(self.status),
+ err: Box::new(DBProstError::ProstDecodeError(e)),
+ })
} else {
- match DBError::from_json_bytes(self.status, &self.output) {
- Ok(err) => Err(self.body_err(DBProstError::DBWrapError(err))),
- Err(err) => Err(self.body_err(DBProstError::JsonDecodeError(err)))
- }
+ let err= DBError::new(self.status.clone(), "internal_err", "Internal Error");
+ Err(DBProstError::DBWrapError(err))
}
}
}
impl<T: Message + Default + 'static> ServiceResponse<T> {
- /// Turn a protobuf service response into a byte-array service response
- pub fn to_proto_raw(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
+ pub fn try_encode(&self) -> Result<ServiceResponse<Vec<u8>>, DBProstError> {
let mut body = Vec::new();
- if let Err(err) = self.output.encode(&mut body) {
- Err(DBProstError::ProstEncodeError(err))
- } else {
- Ok(self.clone_with_output(body))
- }
+ self.output.encode(&mut body)
+ .map(|v|
+ ServiceResponse {
+ version: self.version,
+ headers: self.headers.clone(),
+ status: self.status.clone(),
+ output: body,
+ })
+ .map_err(|e| DBProstError::ProstEncodeError(e))
}
-
/// Turn a hyper response into a protobuf service response
- pub async fn from_hyper_proto(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
- // Box::pin(async move {
- ServiceResponse::from_hyper_raw(resp).await.and_then(|v| v.to_proto())
- // })
+ pub async fn decode_response(resp: Response<Body>) -> Result<ServiceResponse<T>, DBProstError> {
+ ServiceResponse::try_from_hyper(resp).await.and_then(|v| v.try_decode())
}
-
/// Turn a protobuf service response into a hyper response
- pub fn to_hyper_proto(&self) -> Result<Response<Body>, DBProstError> {
- self.to_proto_raw().map(|v| v.to_hyper_raw())
+ pub fn into_encoded_hyper(&self) -> Result<Response<Body>, DBProstError> {
+ self.try_encode().map(|v| v.into_hyper())
}
}
diff --git a/xds/src/server/.DS_Store b/xds/src/server/.DS_Store
new file mode 100644
index 0000000..5008ddf
Binary files /dev/null and b/xds/src/server/.DS_Store differ
diff --git a/xds/src/server/server.rs b/xds/src/server/server.rs
index dd8930a..f27bc0a 100644
--- a/xds/src/server/server.rs
+++ b/xds/src/server/server.rs
@@ -16,14 +16,11 @@
*/
use std::convert::Infallible;
-use std::env;
-use std::process::Output;
use std::task::Poll;
-use hyper::client::service;
use hyper::{Body, Request, Response, Server as hyper_server};
-use hyper::service::{make_service_fn, service_fn};
+use hyper::service::{ make_service_fn, service_fn };
use tower::Service;
-use futures::future::{ ready, Ready, BoxFuture};
+use futures::future::{ ready, BoxFuture };
use std::net::SocketAddr;
use crate::protocol::message::*;
@@ -38,8 +35,16 @@ impl RpcServer {
}
}
- pub async fn start(&self) {
+ pub async fn start<S>(&self, service: S)
+ where
+ S: Service<Request<Body>, Response = Response<Body>, Error = Infallible>
+ + Clone
+ + Send
+ + 'static,
+ S::Future: Send + 'static,
+ {
env_logger::init();
+
let make_service = make_service_fn(|_conn| async {
let svc = RPCHandler;
Ok::<_, Infallible>(svc)
diff --git a/xds/src/util.rs b/xds/src/util.rs
index 2fbe3f3..39374fb 100644
--- a/xds/src/util.rs
+++ b/xds/src/util.rs
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
use futures::future::BoxFuture;
use crate::request::ServiceRequest;
use crate::response::ServiceResponse;
diff --git a/xds/src/wrapper.rs b/xds/src/wrapper.rs
deleted file mode 100644
index b80ffa8..0000000
--- a/xds/src/wrapper.rs
+++ /dev/null
@@ -1,110 +0,0 @@
-use std::sync::Arc;
-use std::task::Poll;
-use futures::future::BoxFuture;
-
-use hyper::{Body, Request, Response, header, StatusCode};
-
-use hyper::client::conn::Builder;
-use hyper::client::connect::HttpConnector;
-use hyper::client::service::Connect;
-
-use tower::Service;
-use prost::{Message};
-
-use crate::request::ServiceRequest;
-use crate::response::ServiceResponse;
-use crate::util::*;
-use crate::error::*;
-
-
-
-/// A wrapper for a hyper client
-#[derive(Debug)]
-pub struct HyperClient {
- /// The hyper client
- /// The root URL without any path attached
- pub root_url: String,
-}
-
-impl HyperClient {
- /// Create a new client wrapper for the given client and root using protobuf
- pub fn new(root_url: &str) -> HyperClient {
- HyperClient {
- root_url: root_url.trim_end_matches('/').to_string(),
- }
- }
-
- // Invoke the given request for the given path and return a boxed future result
- pub async fn request<I, O>(&self, path: &str, req: ServiceRequest<I>) -> Result<ServiceResponse<O>, DBProstError>
- where I: Message + Default + 'static, O: Message + Default + 'static {
- let hyper_req = req.to_hyper_proto().unwrap();
- let mut hyper_connect = Connect::new(HttpConnector::new(), Builder::new());
-
- let url_string = format!("{}/{}", self.root_url, path.trim_start_matches('/'));
- let uri = url_string.parse::<hyper::Uri>().unwrap();
-
- let mut req_to_send = hyper_connect.call(uri.clone()).await.map_err(DBProstError::HyperError).unwrap();
-
- let hyper_resp = req_to_send.call(hyper_req).await.map_err(DBProstError::HyperError).unwrap();
- ServiceResponse::from_hyper_proto(hyper_resp).await
- }
-}
-
-/// Service for taking a raw service request and returning a boxed future of a raw service response
-pub trait HyperService {
- /// Accept a raw service request and return a boxed future of a raw service response
- fn handle(&self, req: ServiceRequest<Vec<u8>>) -> BoxFutureResp<Vec<u8>>;
-}
-
-/// A wrapper for a `HyperService` trait that keeps a `Arc` version of the service
-pub struct HyperServer<T> {
- /// The `Arc` version of the service
- ///
- /// Needed because of [hyper Service lifetimes](https://github.com/tokio-rs/tokio-service/issues/9)
- pub service: Arc<T>,
-}
-
-impl<T> HyperServer<T> {
- /// Create a new service wrapper for the given impl
- pub fn new(service: T) -> HyperServer<T> { HyperServer { service: Arc::new(service) } }
-}
-
-impl<T> Service<Request<Body>> for HyperServer<T>
- where T: 'static + Send + Sync + HyperService
-{
- type Response = Response<Body>;
- type Error = hyper::Error;
- type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
-
- fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let content_type = req.headers().get(header::CONTENT_TYPE).unwrap().to_str().unwrap();
- if content_type == "application/protobuf" {
- Box::pin(async move {
- let status = StatusCode::UNSUPPORTED_MEDIA_TYPE;
- Ok(DBError::new(status, "bad_content_type", "Content type must be application/protobuf").to_hyper_resp())
- })
- } else {
- let service = self.service.clone();
- Box::pin(async move {
- let request = ServiceRequest::from_hyper_raw(req).await;
- let resp = service.handle(request.unwrap()).await;
- resp.map(|v| v.to_hyper_raw())
- .or_else(|err| match err.root_err() {
- DBProstError::ProstDecodeError(_) =>
- Ok(DBError::new(StatusCode::BAD_REQUEST, "protobuf_decode_err", "Invalid protobuf body").
- to_hyper_resp()),
- DBProstError::DBWrapError(err) =>
- Ok(err.to_hyper_resp()),
- DBProstError::HyperError(err) =>
- Err(err),
- _ => Ok(DBError::new(StatusCode::INTERNAL_SERVER_ERROR, "internal_err", "Internal Error").
- to_hyper_resp()),
- })
- })
- }
- }
-}
\ No newline at end of file