You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2023/03/04 05:32:24 UTC
[dubbo-rust] branch main updated: Ftr: protocol layer abstraction design (#125)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 64cf21c Ftr: protocol layer abstraction design (#125)
64cf21c is described below
commit 64cf21cfcd366b5f050925d16f432dadb6090966
Author: 墨舟 <te...@foxmail.com>
AuthorDate: Sat Mar 4 13:32:18 2023 +0800
Ftr: protocol layer abstraction design (#125)
---
Cargo.toml | 6 +-
application.yaml | 7 +-
{protocol/protocol => common/base}/Cargo.toml | 5 +-
{protocol/protocol => common/base}/LICENSE | 0
.../consts.rs => common/base/src/constants.rs | 1 -
{protocol/protocol => common/base}/src/lib.rs | 22 ++---
dubbo/src/common/mod.rs => common/base/src/node.rs | 13 ++-
{dubbo/src/common => common/base/src}/url.rs | 16 ++-
common/utils/Cargo.toml | 4 +-
common/utils/src/host_util.rs | 72 ++++++++++++++
common/utils/src/lib.rs | 1 +
config/src/protocol.rs | 2 +-
dubbo/Cargo.toml | 1 +
dubbo/src/cluster/directory.rs | 2 +-
dubbo/src/cluster/loadbalance/impls/random.rs | 2 +-
dubbo/src/cluster/loadbalance/impls/roundrobin.rs | 2 +-
dubbo/src/cluster/loadbalance/types.rs | 3 +-
dubbo/src/cluster/support/cluster_invoker.rs | 2 +-
dubbo/src/framework.rs | 6 +-
dubbo/src/lib.rs | 1 -
dubbo/src/protocol/mod.rs | 2 +-
dubbo/src/protocol/triple/triple_invoker.rs | 3 +-
dubbo/src/protocol/triple/triple_protocol.rs | 6 +-
dubbo/src/protocol/triple/triple_server.rs | 3 +-
dubbo/src/registry/memory_registry.rs | 8 +-
dubbo/src/registry/mod.rs | 2 +-
dubbo/src/registry/protocol.rs | 4 +-
dubbo/src/registry/types.rs | 2 +-
dubbo/src/triple/server/builder.rs | 3 +-
dubbo/src/triple/transport/connection.rs | 2 +-
examples/echo/src/generated/grpc.examples.echo.rs | 4 +-
examples/greeter/Cargo.toml | 1 +
examples/greeter/src/greeter/client.rs | 3 +-
protocol/README.md | 4 +-
protocol/{protocol => base}/Cargo.toml | 2 +
protocol/{protocol => base}/LICENSE | 0
protocol/{triple/src/lib.rs => base/src/error.rs} | 26 +++--
.../{triple/src/lib.rs => base/src/invocation.rs} | 20 ++--
protocol/base/src/invoker.rs | 84 ++++++++++++++++
.../src/common/mod.rs => protocol/base/src/lib.rs | 6 +-
protocol/base/src/output.rs | 109 +++++++++++++++++++++
protocol/triple/Cargo.toml | 3 +-
protocol/triple/src/lib.rs | 2 +
.../triple/src/triple_invoker.rs | 40 +++++---
registry/nacos/Cargo.toml | 1 +
registry/nacos/src/lib.rs | 6 +-
registry/nacos/src/utils/mod.rs | 2 +-
registry/zookeeper/Cargo.toml | 1 +
registry/zookeeper/src/lib.rs | 10 +-
remoting/net/benches/transport_benchmark/main.rs | 3 +
remoting/xds/Cargo.toml | 2 +-
51 files changed, 427 insertions(+), 105 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index ad66064..1d173a3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,6 +3,7 @@ members = [
"common/logger",
"common/utils",
"common/extention",
+ "common/base",
"registry/zookeeper",
"registry/nacos",
"metadata",
@@ -18,7 +19,7 @@ members = [
"remoting/exchange",
"remoting/xds",
"protocol/dubbo2",
- "protocol/protocol",
+ "protocol/base",
"protocol/triple"
]
@@ -39,8 +40,9 @@ serde_json = "1"
urlencoding = "2.1.2"
logger = {path="./common/logger"}
utils = {path="./common/utils"}
+base = {path="./common/base"}
remoting-net = {path="./remoting/net"}
-protocol = {path="./protocol/protocol"}
+protocol = {path= "protocol/base" }
protocol-dubbo2 = {path="./protocol/dubbo2"}
protocol-triple = {path="./protocol/triple"}
registry-zookeeper = {path="./registry/zookeeper"}
diff --git a/application.yaml b/application.yaml
index 0de344e..d357db1 100644
--- a/application.yaml
+++ b/application.yaml
@@ -16,4 +16,9 @@ dubbo:
version: 1.0.0
group: test
protocol: triple
- interface: org.apache.dubbo.sample.tri.Greeter
\ No newline at end of file
+ interface: org.apache.dubbo.sample.tri.Greeter
+ consumer:
+ references:
+ GreeterClientImpl:
+ url: tri://localhost:20000
+ protocol: tri
\ No newline at end of file
diff --git a/protocol/protocol/Cargo.toml b/common/base/Cargo.toml
similarity index 66%
copy from protocol/protocol/Cargo.toml
copy to common/base/Cargo.toml
index 77c8260..7397c19 100644
--- a/protocol/protocol/Cargo.toml
+++ b/common/base/Cargo.toml
@@ -1,8 +1,11 @@
[package]
-name = "protocol"
+name = "base"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+urlencoding.workspace = true
+http = "0.2"
+logger.workspace = true
\ No newline at end of file
diff --git a/protocol/protocol/LICENSE b/common/base/LICENSE
similarity index 100%
copy from protocol/protocol/LICENSE
copy to common/base/LICENSE
diff --git a/dubbo/src/common/consts.rs b/common/base/src/constants.rs
similarity index 99%
rename from dubbo/src/common/consts.rs
rename to common/base/src/constants.rs
index 17993c8..b1faf27 100644
--- a/dubbo/src/common/consts.rs
+++ b/common/base/src/constants.rs
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
pub const REGISTRY_PROTOCOL: &str = "registry_protocol";
pub const PROTOCOL: &str = "protocol";
pub const REGISTRY: &str = "registry";
diff --git a/protocol/protocol/src/lib.rs b/common/base/src/lib.rs
similarity index 78%
rename from protocol/protocol/src/lib.rs
rename to common/base/src/lib.rs
index d64452d..b97b342 100644
--- a/protocol/protocol/src/lib.rs
+++ b/common/base/src/lib.rs
@@ -14,17 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-pub fn add(left: usize, right: usize) -> usize {
- left + right
-}
+#![cfg_attr(
+ debug_assertions,
+ allow(dead_code, unused_imports, unused_variables, unused_mut)
+)]
+pub mod constants;
+pub mod node;
+pub mod url;
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn it_works() {
- let result = add(2, 2);
- assert_eq!(result, 4);
- }
-}
+pub use node::Node;
+pub use url::Url;
diff --git a/dubbo/src/common/mod.rs b/common/base/src/node.rs
similarity index 81%
copy from dubbo/src/common/mod.rs
copy to common/base/src/node.rs
index 2328421..1e4114e 100644
--- a/dubbo/src/common/mod.rs
+++ b/common/base/src/node.rs
@@ -15,5 +15,14 @@
* limitations under the License.
*/
-pub mod consts;
-pub mod url;
+use std::sync::Arc;
+
+use crate::Url;
+
+pub trait Node {
+ fn get_url(&self) -> Arc<Url>;
+ fn is_available(&self) -> bool;
+ fn destroy(&self);
+
+ fn is_destroyed(&self) -> bool;
+}
diff --git a/dubbo/src/common/url.rs b/common/base/src/url.rs
similarity index 94%
rename from dubbo/src/common/url.rs
rename to common/base/src/url.rs
index 2a36d72..81a72c2 100644
--- a/dubbo/src/common/url.rs
+++ b/common/base/src/url.rs
@@ -20,13 +20,13 @@ use std::{
fmt::{Display, Formatter},
};
-use crate::common::consts::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
+use crate::constants::{GROUP_KEY, INTERFACE_KEY, VERSION_KEY};
use http::Uri;
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Url {
pub raw_url_string: String,
- // value of scheme is different to protocol name, eg. triple -> tri://
+ // value of scheme is different to base name, eg. triple -> tri://
pub scheme: String,
pub location: String,
pub ip: String,
@@ -48,7 +48,7 @@ impl Url {
let uri = url
.parse::<http::Uri>()
.map_err(|err| {
- tracing::error!("fail to parse url({}), err: {:?}", url, err);
+ logger::tracing::error!("fail to parse url({}), err: {:?}", url, err);
})
.unwrap();
let query = uri.path_and_query().unwrap().query();
@@ -157,6 +157,10 @@ impl Url {
pub fn short_url(&self) -> String {
format!("{}://{}:{}", self.scheme, self.ip, self.port)
}
+
+ pub fn protocol(&self) -> String {
+ self.scheme.clone()
+ }
}
impl Display for Url {
@@ -179,8 +183,10 @@ impl From<&str> for Url {
#[cfg(test)]
mod tests {
- use super::*;
- use crate::common::consts::{ANYHOST_KEY, VERSION_KEY};
+ use crate::{
+ constants::{ANYHOST_KEY, VERSION_KEY},
+ url::Url,
+ };
#[test]
fn test_from_url() {
diff --git a/common/utils/Cargo.toml b/common/utils/Cargo.toml
index 4665ab6..0b8c84f 100644
--- a/common/utils/Cargo.toml
+++ b/common/utils/Cargo.toml
@@ -10,4 +10,6 @@ serde_yaml.workspace = true
serde = { workspace = true, features = ["derive"] }
project-root = "0.2.2"
anyhow.workspace=true
-once_cell.workspace = true
\ No newline at end of file
+once_cell.workspace = true
+local-ip-address = "0.5.1"
+port-selector = "0.1.6"
\ No newline at end of file
diff --git a/common/utils/src/host_util.rs b/common/utils/src/host_util.rs
new file mode 100644
index 0000000..0b029ef
--- /dev/null
+++ b/common/utils/src/host_util.rs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::net::IpAddr;
+
+use port_selector::is_free;
+
+pub use port_selector::Port;
+
+// get local ip for linux/macos/windows
+#[allow(dead_code)]
+pub(crate) fn local_ip() -> IpAddr {
+ local_ip_address::local_ip().unwrap()
+}
+
+#[allow(dead_code)]
+pub(crate) fn is_free_port(port: Port) -> bool {
+ is_free(port)
+}
+
+// scan from the give port
+#[allow(dead_code)]
+pub(crate) fn scan_free_port(port: Port) -> Port {
+ for selected_port in port..65535 {
+ if is_free_port(selected_port) {
+ return selected_port;
+ } else {
+ continue;
+ }
+ }
+ port
+}
+
+#[cfg(test)]
+mod tests {
+ use local_ip_address::list_afinet_netifas;
+
+ use super::*;
+
+ #[test]
+ fn test_local_ip() {
+ let ip = local_ip();
+ println!("ip: {}", ip);
+ }
+
+ #[test]
+ fn test_local_addresses() {
+ let network_interfaces = list_afinet_netifas().unwrap();
+ for (name, ip) in network_interfaces.iter() {
+ println!("{}:\t{:?}", name, ip);
+ }
+ }
+
+ #[test]
+ fn test_scan_free_port() {
+ let free_port = scan_free_port(7890);
+ println!("{}", free_port);
+ }
+}
diff --git a/common/utils/src/lib.rs b/common/utils/src/lib.rs
index 47dbe62..7a0a45b 100644
--- a/common/utils/src/lib.rs
+++ b/common/utils/src/lib.rs
@@ -14,5 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+pub mod host_util;
pub mod path_util;
pub mod yaml_util;
diff --git a/config/src/protocol.rs b/config/src/protocol.rs
index 4a47ac9..86ff053 100644
--- a/config/src/protocol.rs
+++ b/config/src/protocol.rs
@@ -77,7 +77,7 @@ impl ProtocolRetrieve for ProtocolConfig {
} else {
let result = self.get_protocol(protocol_key);
if let Some(..) = result {
- panic!("default triple protocol dose not defined.")
+ panic!("default triple base dose not defined.")
} else {
result.unwrap()
}
diff --git a/dubbo/Cargo.toml b/dubbo/Cargo.toml
index 5700e81..3b66523 100644
--- a/dubbo/Cargo.toml
+++ b/dubbo/Cargo.toml
@@ -36,6 +36,7 @@ aws-smithy-http = "0.54.1"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
+base.workspace=true
dubbo-config = { path = "../config", version = "0.3.0" }
diff --git a/dubbo/src/cluster/directory.rs b/dubbo/src/cluster/directory.rs
index 2879de4..d92bb20 100644
--- a/dubbo/src/cluster/directory.rs
+++ b/dubbo/src/cluster/directory.rs
@@ -23,10 +23,10 @@ use std::{
};
use crate::{
- common::url::Url,
invocation::{Invocation, RpcInvocation},
registry::{memory_registry::MemoryNotifyListener, BoxRegistry, RegistryWrapper},
};
+use base::Url;
/// Directory.
///
diff --git a/dubbo/src/cluster/loadbalance/impls/random.rs b/dubbo/src/cluster/loadbalance/impls/random.rs
index a5ca7df..ddfcd39 100644
--- a/dubbo/src/cluster/loadbalance/impls/random.rs
+++ b/dubbo/src/cluster/loadbalance/impls/random.rs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use base::Url;
use std::{
fmt::{Debug, Formatter},
sync::Arc,
@@ -22,7 +23,6 @@ use std::{
use crate::{
cluster::loadbalance::types::{LoadBalance, Metadata},
codegen::RpcInvocation,
- common::url::Url,
};
pub struct RandomLoadBalance {
diff --git a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
index cd951bb..0c59ed4 100644
--- a/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
+++ b/dubbo/src/cluster/loadbalance/impls/roundrobin.rs
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use base::Url;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
@@ -26,7 +27,6 @@ use std::{
use crate::{
cluster::loadbalance::types::{LoadBalance, Metadata},
codegen::RpcInvocation,
- common::url::Url,
};
pub struct RoundRobinLoadBalance {
diff --git a/dubbo/src/cluster/loadbalance/types.rs b/dubbo/src/cluster/loadbalance/types.rs
index ac31176..fd48ed9 100644
--- a/dubbo/src/cluster/loadbalance/types.rs
+++ b/dubbo/src/cluster/loadbalance/types.rs
@@ -15,9 +15,10 @@
* limitations under the License.
*/
+use base::Url;
use std::{fmt::Debug, sync::Arc};
-use crate::{codegen::RpcInvocation, common::url::Url};
+use crate::codegen::RpcInvocation;
pub type BoxLoadBalance = Box<dyn LoadBalance + Send + Sync>;
diff --git a/dubbo/src/cluster/support/cluster_invoker.rs b/dubbo/src/cluster/support/cluster_invoker.rs
index e00b849..67c9839 100644
--- a/dubbo/src/cluster/support/cluster_invoker.rs
+++ b/dubbo/src/cluster/support/cluster_invoker.rs
@@ -18,6 +18,7 @@
use aws_smithy_http::body::SdkBody;
use std::{str::FromStr, sync::Arc};
+use base::Url;
use http::{uri::PathAndQuery, Request};
use crate::{
@@ -26,7 +27,6 @@ use crate::{
support::DEFAULT_LOADBALANCE,
},
codegen::{Directory, RegistryDirectory, TripleClient},
- common::url::Url,
invocation::RpcInvocation,
};
diff --git a/dubbo/src/framework.rs b/dubbo/src/framework.rs
index 71e11d2..546e2d0 100644
--- a/dubbo/src/framework.rs
+++ b/dubbo/src/framework.rs
@@ -22,11 +22,11 @@ use std::{
sync::{Arc, Mutex},
};
+use base::Url;
use futures::{future, Future};
use tracing::{debug, info};
use crate::{
- common::url::Url,
protocol::{BoxExporter, Protocol},
registry::{
protocol::RegistryProtocol,
@@ -94,7 +94,7 @@ impl Dubbo {
info!("protocol_url: {:?}", protocol_url);
Url::from_url(&protocol_url)
} else {
- return Err(format!("protocol {:?} not exists", service_config.protocol).into());
+ return Err(format!("base {:?} not exists", service_config.protocol).into());
};
info!("url: {:?}", url);
if url.is_none() {
@@ -126,7 +126,7 @@ impl Dubbo {
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
- info!("protocol: {:?}, service url: {:?}", name, url);
+ info!("base: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter);
//TODO multiple registry
diff --git a/dubbo/src/lib.rs b/dubbo/src/lib.rs
index 2174365..63c09d3 100644
--- a/dubbo/src/lib.rs
+++ b/dubbo/src/lib.rs
@@ -17,7 +17,6 @@
pub mod cluster;
pub mod codegen;
-pub mod common;
pub mod context;
pub mod filter;
mod framework;
diff --git a/dubbo/src/protocol/mod.rs b/dubbo/src/protocol/mod.rs
index 2c8ad8f..886308d 100644
--- a/dubbo/src/protocol/mod.rs
+++ b/dubbo/src/protocol/mod.rs
@@ -25,7 +25,7 @@ use async_trait::async_trait;
use aws_smithy_http::body::SdkBody;
use tower_service::Service;
-use crate::common::url::Url;
+use base::Url;
pub mod server_desc;
pub mod triple;
diff --git a/dubbo/src/protocol/triple/triple_invoker.rs b/dubbo/src/protocol/triple/triple_invoker.rs
index 2bcc2d3..d7d54ee 100644
--- a/dubbo/src/protocol/triple/triple_invoker.rs
+++ b/dubbo/src/protocol/triple/triple_invoker.rs
@@ -16,10 +16,11 @@
*/
use aws_smithy_http::body::SdkBody;
+use base::Url;
use std::fmt::{Debug, Formatter};
use tower_service::Service;
-use crate::{common::url::Url, protocol::Invoker, triple::client::builder::ClientBoxService};
+use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
pub struct TripleInvoker {
url: Url,
diff --git a/dubbo/src/protocol/triple/triple_protocol.rs b/dubbo/src/protocol/triple/triple_protocol.rs
index c6ade9b..fd1d369 100644
--- a/dubbo/src/protocol/triple/triple_protocol.rs
+++ b/dubbo/src/protocol/triple/triple_protocol.rs
@@ -18,14 +18,12 @@
use std::{boxed::Box, collections::HashMap};
use async_trait::async_trait;
+use base::Url;
use super::{
triple_exporter::TripleExporter, triple_invoker::TripleInvoker, triple_server::TripleServer,
};
-use crate::{
- common::url::Url,
- protocol::{BoxExporter, Protocol},
-};
+use crate::protocol::{BoxExporter, Protocol};
#[derive(Clone)]
pub struct TripleProtocol {
diff --git a/dubbo/src/protocol/triple/triple_server.rs b/dubbo/src/protocol/triple/triple_server.rs
index d297a88..1db27fa 100644
--- a/dubbo/src/protocol/triple/triple_server.rs
+++ b/dubbo/src/protocol/triple/triple_server.rs
@@ -15,7 +15,8 @@
* limitations under the License.
*/
-use crate::{common::url::Url, triple::server::builder::ServerBuilder};
+use crate::triple::server::builder::ServerBuilder;
+use base::Url;
#[derive(Default, Clone)]
pub struct TripleServer {
diff --git a/dubbo/src/registry/memory_registry.rs b/dubbo/src/registry/memory_registry.rs
index b41a0e0..7f900f6 100644
--- a/dubbo/src/registry/memory_registry.rs
+++ b/dubbo/src/registry/memory_registry.rs
@@ -23,7 +23,7 @@ use std::{
};
use tracing::debug;
-use crate::common::url::Url;
+use base::Url;
use super::{NotifyListener, Registry, RegistryNotifyListener};
@@ -69,7 +69,7 @@ impl Registry for MemoryRegistry {
Ok(())
}
- fn unregister(&mut self, url: crate::common::url::Url) -> Result<(), crate::StdError> {
+ fn unregister(&mut self, url: base::Url) -> Result<(), crate::StdError> {
let registry_group = match url.get_param(REGISTRY_GROUP_KEY) {
Some(key) => key,
None => "dubbo".to_string(),
@@ -88,7 +88,7 @@ impl Registry for MemoryRegistry {
fn subscribe(
&self,
- url: crate::common::url::Url,
+ url: base::Url,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError> {
todo!()
@@ -96,7 +96,7 @@ impl Registry for MemoryRegistry {
fn unsubscribe(
&self,
- url: crate::common::url::Url,
+ url: base::Url,
listener: RegistryNotifyListener,
) -> Result<(), crate::StdError> {
todo!()
diff --git a/dubbo/src/registry/mod.rs b/dubbo/src/registry/mod.rs
index 4483588..c5e2674 100644
--- a/dubbo/src/registry/mod.rs
+++ b/dubbo/src/registry/mod.rs
@@ -26,7 +26,7 @@ use std::{
sync::Arc,
};
-use crate::common::url::Url;
+use base::Url;
pub type RegistryNotifyListener = Arc<dyn NotifyListener + Send + Sync + 'static>;
pub trait Registry {
diff --git a/dubbo/src/registry/protocol.rs b/dubbo/src/registry/protocol.rs
index 04b8ac9..d28e43d 100644
--- a/dubbo/src/registry/protocol.rs
+++ b/dubbo/src/registry/protocol.rs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+use base::Url;
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
@@ -23,7 +24,6 @@ use std::{
use super::{memory_registry::MemoryRegistry, BoxRegistry};
use crate::{
- common::url::Url,
protocol::{
triple::{triple_exporter::TripleExporter, triple_protocol::TripleProtocol},
BoxExporter, BoxInvoker, Protocol,
@@ -116,7 +116,7 @@ impl Protocol for RegistryProtocol {
return pro.export(url).await;
}
_ => {
- tracing::error!("protocol {:?} not implemented", url.scheme);
+ tracing::error!("base {:?} not implemented", url.scheme);
Box::new(TripleExporter::new())
}
}
diff --git a/dubbo/src/registry/types.rs b/dubbo/src/registry/types.rs
index 8b97864..a55e72c 100644
--- a/dubbo/src/registry/types.rs
+++ b/dubbo/src/registry/types.rs
@@ -20,11 +20,11 @@ use std::{
sync::{Arc, Mutex},
};
+use base::Url;
use itertools::Itertools;
use tracing::info;
use crate::{
- common::url::Url,
registry::{BoxRegistry, Registry},
StdError,
};
diff --git a/dubbo/src/triple/server/builder.rs b/dubbo/src/triple/server/builder.rs
index af3cc6d..e82ff05 100644
--- a/dubbo/src/triple/server/builder.rs
+++ b/dubbo/src/triple/server/builder.rs
@@ -20,11 +20,12 @@ use std::{
str::FromStr,
};
+use base::Url;
use http::{Request, Response, Uri};
use hyper::body::Body;
use tower_service::Service;
-use crate::{common::url::Url, triple::transport::DubboServer, BoxBody};
+use crate::{triple::transport::DubboServer, BoxBody};
#[derive(Clone, Default, Debug)]
pub struct ServerBuilder {
diff --git a/dubbo/src/triple/transport/connection.rs b/dubbo/src/triple/transport/connection.rs
index c99b399..360149b 100644
--- a/dubbo/src/triple/transport/connection.rs
+++ b/dubbo/src/triple/transport/connection.rs
@@ -85,7 +85,7 @@ where
let mut connector = Connect::new(get_connector(self.connector), builder);
let uri = self.host.clone();
let fut = async move {
- debug!("send rpc call to {}", uri);
+ debug!("send base call to {}", uri);
let mut con = connector.call(uri).await.unwrap();
con.call(req)
diff --git a/examples/echo/src/generated/grpc.examples.echo.rs b/examples/echo/src/generated/grpc.examples.echo.rs
index 16fb163..ccb385c 100644
--- a/examples/echo/src/generated/grpc.examples.echo.rs
+++ b/examples/echo/src/generated/grpc.examples.echo.rs
@@ -1,12 +1,12 @@
-// @generated by apache/dubbo-rust.
-
/// EchoRequest is the request for echo.
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoRequest {
#[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
}
/// EchoResponse is the response for echo.
+#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EchoResponse {
#[prost(string, tag = "1")]
diff --git a/examples/greeter/Cargo.toml b/examples/greeter/Cargo.toml
index 91cbefe..a8c6cac 100644
--- a/examples/greeter/Cargo.toml
+++ b/examples/greeter/Cargo.toml
@@ -33,6 +33,7 @@ dubbo = { path = "../../dubbo", version = "0.3.0" }
dubbo-config = { path = "../../config", version = "0.3.0" }
registry-zookeeper.workspace = true
registry-nacos.workspace = true
+base.workspace = true
[build-dependencies]
dubbo-build = { path = "../../dubbo-build", version = "0.3.0" }
diff --git a/examples/greeter/src/greeter/client.rs b/examples/greeter/src/greeter/client.rs
index 4b5437a..4591fd9 100644
--- a/examples/greeter/src/greeter/client.rs
+++ b/examples/greeter/src/greeter/client.rs
@@ -22,8 +22,9 @@ pub mod protos {
use std::env;
-use dubbo::{codegen::*, common::url::Url};
+use dubbo::codegen::*;
+use base::Url;
use futures_util::StreamExt;
use protos::{greeter_client::GreeterClient, GreeterRequest};
use registry_nacos::NacosRegistry;
diff --git a/protocol/README.md b/protocol/README.md
index c6b9a8c..c0fd296 100644
--- a/protocol/README.md
+++ b/protocol/README.md
@@ -1,4 +1,6 @@
+```markdown
/protocol
/protocol # define protocol abstract layer
/dubbo2 # for dubbo2 protocol, hessian2 codec as default
- /triple # for triple protocol
\ No newline at end of file
+ /triple # for triple protocol
+```
\ No newline at end of file
diff --git a/protocol/protocol/Cargo.toml b/protocol/base/Cargo.toml
similarity index 79%
rename from protocol/protocol/Cargo.toml
rename to protocol/base/Cargo.toml
index 77c8260..379c342 100644
--- a/protocol/protocol/Cargo.toml
+++ b/protocol/base/Cargo.toml
@@ -6,3 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+dashmap.workspace = true
+base.workspace = true
\ No newline at end of file
diff --git a/protocol/protocol/LICENSE b/protocol/base/LICENSE
similarity index 100%
rename from protocol/protocol/LICENSE
rename to protocol/base/LICENSE
diff --git a/protocol/triple/src/lib.rs b/protocol/base/src/error.rs
similarity index 66%
copy from protocol/triple/src/lib.rs
copy to protocol/base/src/error.rs
index 9d8d4b0..0ad0c26 100644
--- a/protocol/triple/src/lib.rs
+++ b/protocol/base/src/error.rs
@@ -15,17 +15,23 @@
* limitations under the License.
*/
-pub fn add(left: usize, right: usize) -> usize {
- left + right
-}
+use std::{
+ error::Error,
+ fmt::{Debug, Display, Formatter},
+};
-#[cfg(test)]
-mod tests {
- use super::*;
+pub struct InvokerError(String);
- #[test]
- fn it_works() {
- let result = add(2, 2);
- assert_eq!(result, 4);
+impl Debug for InvokerError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.0.as_str())
}
}
+
+impl Display for InvokerError {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.0.as_str())
+ }
+}
+
+impl Error for InvokerError {}
diff --git a/protocol/triple/src/lib.rs b/protocol/base/src/invocation.rs
similarity index 73%
copy from protocol/triple/src/lib.rs
copy to protocol/base/src/invocation.rs
index 9d8d4b0..d77cad4 100644
--- a/protocol/triple/src/lib.rs
+++ b/protocol/base/src/invocation.rs
@@ -15,17 +15,13 @@
* limitations under the License.
*/
-pub fn add(left: usize, right: usize) -> usize {
- left + right
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
+use std::{any::Any, sync::Arc};
- #[test]
- fn it_works() {
- let result = add(2, 2);
- assert_eq!(result, 4);
- }
+pub trait Invocation {
+ fn get_method_name(&self) -> String;
+ fn get_parameter_types(&self) -> Vec<String>;
+ fn get_arguments(&self) -> Vec<String>;
+ fn get_reply(&self) -> Arc<dyn Any>;
}
+
+pub type BoxInvocation = Arc<dyn Invocation + Send + Sync>;
diff --git a/protocol/base/src/invoker.rs b/protocol/base/src/invoker.rs
new file mode 100644
index 0000000..62bc8bf
--- /dev/null
+++ b/protocol/base/src/invoker.rs
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use crate::{
+ invocation::BoxInvocation,
+ output::{BoxOutput, RPCOutput},
+};
+use base::{Node, Url};
+use std::{
+ fmt::{Display, Formatter},
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+ },
+};
+
+pub struct BaseInvoker {
+ url: Arc<Url>,
+ available: AtomicBool,
+ destroyed: AtomicBool,
+}
+
+pub trait Invoker {
+ type Output;
+ fn invoke(&self, invocation: BoxInvocation) -> Self::Output;
+}
+
+impl Invoker for BaseInvoker {
+ type Output = BoxOutput<String>;
+ fn invoke(&self, _invocation: BoxInvocation) -> Self::Output {
+ Arc::new(RPCOutput::default())
+ }
+}
+
+impl Node for BaseInvoker {
+ fn get_url(&self) -> Arc<Url> {
+ self.url.clone()
+ }
+
+ fn is_available(&self) -> bool {
+ self.available.load(Ordering::SeqCst)
+ }
+
+ fn destroy(&self) {
+ self.destroyed.store(true, Ordering::SeqCst);
+ self.available.store(false, Ordering::SeqCst)
+ }
+ fn is_destroyed(&self) -> bool {
+ self.destroyed.load(Ordering::SeqCst)
+ }
+}
+
+impl Display for BaseInvoker {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Invoker")
+ .field("protocol", &self.url.scheme)
+ .field("host", &self.url.ip)
+ .field("path", &self.url.location)
+ .finish()
+ }
+}
+
+impl BaseInvoker {
+ pub fn new(url: Url) -> Self {
+ Self {
+ url: Arc::new(url),
+ available: AtomicBool::new(true),
+ destroyed: AtomicBool::new(false),
+ }
+ }
+}
diff --git a/dubbo/src/common/mod.rs b/protocol/base/src/lib.rs
similarity index 92%
rename from dubbo/src/common/mod.rs
rename to protocol/base/src/lib.rs
index 2328421..6928742 100644
--- a/dubbo/src/common/mod.rs
+++ b/protocol/base/src/lib.rs
@@ -15,5 +15,7 @@
* limitations under the License.
*/
-pub mod consts;
-pub mod url;
+pub mod error;
+pub mod invocation;
+pub mod invoker;
+pub mod output;
diff --git a/protocol/base/src/output.rs b/protocol/base/src/output.rs
new file mode 100644
index 0000000..e0d2583
--- /dev/null
+++ b/protocol/base/src/output.rs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+use dashmap::DashMap;
+
+use crate::error::InvokerError;
+
+pub type AttachmentsMap = DashMap<String, String>;
+
+pub struct RPCOutput<R: Any + Debug> {
+ error: Option<Arc<InvokerError>>,
+ result: Option<Arc<R>>,
+ attachments: AttachmentsMap,
+}
+
+// role of Output is same to Result, because of preload std::result::Result
+pub trait Output<R> {
+ fn set_error(&mut self, error: Arc<InvokerError>);
+ fn error(&self) -> Option<Arc<InvokerError>>;
+ fn set(&mut self, result: R);
+ fn get(&self) -> Option<Arc<R>>;
+ fn set_attachments(&mut self, attachments: AttachmentsMap);
+ fn add_attachment(&mut self, key: &str, value: &str);
+ fn get_attachment_or_default(&self, key: &str, default_value: &str) -> String;
+}
+
+pub type BoxOutput<R> = Arc<dyn Output<R> + Send + Sync + 'static>;
+
+impl<R> Default for RPCOutput<R>
+where
+ R: Any + Debug,
+{
+ fn default() -> Self {
+ RPCOutput {
+ error: None,
+ result: None,
+ attachments: AttachmentsMap::new(),
+ }
+ }
+}
+
+impl<R> Output<R> for RPCOutput<R>
+where
+ R: Any + Debug,
+{
+ fn set_error(&mut self, error: Arc<InvokerError>) {
+ self.error = Some(error);
+ }
+
+ fn error(&self) -> Option<Arc<InvokerError>> {
+ self.error.clone()
+ }
+
+ fn set(&mut self, result: R)
+ where
+ R: Any + Debug,
+ {
+ self.result = Some(Arc::new(result))
+ }
+
+ fn get(&self) -> Option<Arc<R>> {
+ self.result.clone()
+ }
+
+ fn set_attachments(&mut self, attachments: AttachmentsMap) {
+ self.attachments = attachments;
+ }
+
+ fn add_attachment(&mut self, key: &str, value: &str) {
+ self.attachments.insert(key.to_string(), value.to_string());
+ }
+
+ fn get_attachment_or_default(&self, key: &str, default_value: &str) -> String {
+ self.attachments
+ .contains_key(key)
+ .then(|| self.attachments.get(key).unwrap().clone())
+ .unwrap_or(default_value.to_string())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_result() {
+ let mut result: RPCOutput<String> = RPCOutput::default();
+ result.set("r".to_string());
+ assert_eq!(result.get().unwrap().as_str(), "r");
+ result.add_attachment("hello", "world");
+ let string = result.get_attachment_or_default("hello", "test");
+ println!("{}", string);
+ }
+}
diff --git a/protocol/triple/Cargo.toml b/protocol/triple/Cargo.toml
index 43aa6c5..b342a3b 100644
--- a/protocol/triple/Cargo.toml
+++ b/protocol/triple/Cargo.toml
@@ -7,4 +7,5 @@ edition = "2021"
[dependencies]
remoting-net.workspace = true
-protocol.workspace = true
\ No newline at end of file
+protocol.workspace = true
+base.workspace = true
diff --git a/protocol/triple/src/lib.rs b/protocol/triple/src/lib.rs
index 9d8d4b0..d7d432b 100644
--- a/protocol/triple/src/lib.rs
+++ b/protocol/triple/src/lib.rs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+pub mod triple_invoker;
+
pub fn add(left: usize, right: usize) -> usize {
left + right
}
diff --git a/dubbo/src/protocol/triple/triple_server.rs b/protocol/triple/src/triple_invoker.rs
similarity index 57%
copy from dubbo/src/protocol/triple/triple_server.rs
copy to protocol/triple/src/triple_invoker.rs
index d297a88..4a2e74f 100644
--- a/dubbo/src/protocol/triple/triple_server.rs
+++ b/protocol/triple/src/triple_invoker.rs
@@ -14,23 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+use base::{Node, Url};
+use protocol::{
+ invocation::BoxInvocation,
+ invoker::{BaseInvoker, Invoker},
+};
+use std::sync::Arc;
-use crate::{common::url::Url, triple::server::builder::ServerBuilder};
+pub struct TripleInvoker {
+ base: BaseInvoker,
+}
+
+impl Invoker for TripleInvoker {
+ type Output = ();
-#[derive(Default, Clone)]
-pub struct TripleServer {
- builder: ServerBuilder,
+ fn invoke(&self, _invocation: BoxInvocation) -> Self::Output {
+ todo!()
+ }
}
-impl TripleServer {
- pub fn new() -> TripleServer {
- Self {
- builder: ServerBuilder::new(),
- }
+impl Node for TripleInvoker {
+ fn get_url(&self) -> Arc<Url> {
+ self.base.get_url()
+ }
+
+ fn is_available(&self) -> bool {
+ self.base.is_available()
+ }
+
+ fn destroy(&self) {
+ todo!()
}
- pub async fn serve(mut self, url: Url) {
- self.builder = ServerBuilder::from(url);
- self.builder.build().serve().await.unwrap()
+ fn is_destroyed(&self) -> bool {
+ self.base.is_destroyed()
}
}
diff --git a/registry/nacos/Cargo.toml b/registry/nacos/Cargo.toml
index 1a3a686..d30827b 100644
--- a/registry/nacos/Cargo.toml
+++ b/registry/nacos/Cargo.toml
@@ -15,6 +15,7 @@ serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
anyhow.workspace = true
logger.workspace = true
+base.workspace = true
[dev-dependencies]
tracing-subscriber = "0.3.16"
tracing = "0.1"
diff --git a/registry/nacos/src/lib.rs b/registry/nacos/src/lib.rs
index 38e710f..cd6515b 100644
--- a/registry/nacos/src/lib.rs
+++ b/registry/nacos/src/lib.rs
@@ -16,16 +16,14 @@
*/
mod utils;
+use base::Url;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
use anyhow::anyhow;
-use dubbo::{
- common::url::Url,
- registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent},
-};
+use dubbo::registry::{NotifyListener, Registry, RegistryNotifyListener, ServiceEvent};
use logger::tracing::{error, info, warn};
use nacos_sdk::api::naming::{NamingService, NamingServiceBuilder, ServiceInstance};
diff --git a/registry/nacos/src/utils/mod.rs b/registry/nacos/src/utils/mod.rs
index 2cca6a2..f506732 100644
--- a/registry/nacos/src/utils/mod.rs
+++ b/registry/nacos/src/utils/mod.rs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-use dubbo::common::url::Url;
+use base::Url;
use nacos_sdk::api::props::ClientProps;
const APP_NAME_KEY: &str = "AppName";
diff --git a/registry/zookeeper/Cargo.toml b/registry/zookeeper/Cargo.toml
index c4a4e59..c7fb82e 100644
--- a/registry/zookeeper/Cargo.toml
+++ b/registry/zookeeper/Cargo.toml
@@ -15,3 +15,4 @@ serde_json.workspace = true
serde = { workspace = true, features = ["derive"] }
urlencoding.workspace = true
logger.workspace = true
+base.workspace = true
diff --git a/registry/zookeeper/src/lib.rs b/registry/zookeeper/src/lib.rs
index 9f7447b..4f11d3a 100644
--- a/registry/zookeeper/src/lib.rs
+++ b/registry/zookeeper/src/lib.rs
@@ -24,6 +24,10 @@ use std::{
time::Duration,
};
+use base::{
+ constants::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
+ Url,
+};
use logger::tracing::{debug, error, info};
use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
@@ -32,10 +36,6 @@ use zookeeper::{Acl, CreateMode, WatchedEvent, WatchedEventType, Watcher, ZooKee
use dubbo::{
cluster::support::cluster_invoker::ClusterInvoker,
codegen::BoxRegistry,
- common::{
- consts::{DUBBO_KEY, LOCALHOST_IP, PROVIDERS_KEY},
- url::Url,
- },
registry::{
integration::ClusterRegistryIntegration, memory_registry::MemoryRegistry, NotifyListener,
Registry, RegistryNotifyListener, ServiceEvent,
@@ -383,7 +383,7 @@ mod tests {
use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher};
- use crate::zookeeper_registry::ZookeeperRegistry;
+ use crate::ZookeeperRegistry;
struct TestZkWatcher {
pub watcher: Arc<Option<TestZkWatcher>>,
diff --git a/remoting/net/benches/transport_benchmark/main.rs b/remoting/net/benches/transport_benchmark/main.rs
index 2944f98..352e856 100644
--- a/remoting/net/benches/transport_benchmark/main.rs
+++ b/remoting/net/benches/transport_benchmark/main.rs
@@ -14,3 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+fn main() {
+ todo!()
+}
diff --git a/remoting/xds/Cargo.toml b/remoting/xds/Cargo.toml
index 8bf8a3b..eb8c027 100644
--- a/remoting/xds/Cargo.toml
+++ b/remoting/xds/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "xds"
+name = "remoting-xds"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"