You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/13 11:47:02 UTC
[rocketmq-clients] branch rust_dev updated: WIP
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch rust_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/rust_dev by this push:
new 8fea2c0 WIP
8fea2c0 is described below
commit 8fea2c037522c25a1e541f7a56cb9261293369f7
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Jul 13 19:46:47 2022 +0800
WIP
---
rust/src/client.rs | 67 ++++++++++++++++++++++++++++++++--------------------
rust/src/producer.rs | 9 +++----
2 files changed, 46 insertions(+), 30 deletions(-)
diff --git a/rust/src/client.rs b/rust/src/client.rs
index 9760fcc..948f88a 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -156,13 +156,12 @@ impl SessionManager {
}
};
- let access_point = Client::access_point();
let request = QueryRouteRequest {
topic: Some(Resource {
name: topic.to_owned(),
- resource_namespace: client.arn().to_owned(),
+ resource_namespace: client.arn.clone(),
}),
- endpoints: Some(access_point),
+ endpoints: Some(client.access_point.clone()),
};
let mut request = tonic::Request::new(request);
@@ -208,6 +207,7 @@ pub(crate) struct Client {
route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
arn: String,
id: String,
+ access_point: pb::Endpoints,
}
static CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
@@ -232,29 +232,41 @@ impl Client {
)
}
- pub(crate) fn new(logger: Logger) -> Self {
+ pub(crate) fn new(
+ logger: Logger,
+ access_url: impl std::net::ToSocketAddrs,
+ ) -> Result<Self, error::ClientError> {
let id = Self::client_id();
- Client {
+ let mut access_point = pb::Endpoints {
+ scheme: pb::AddressScheme::IPv4 as i32,
+ addresses: vec![],
+ };
+
+ for socket_addr in access_url
+ .to_socket_addrs()
+ .map_err(|e| error::ClientError::ClientInternal)?
+ {
+ if socket_addr.is_ipv4() {
+ access_point.scheme = pb::AddressScheme::IPv4 as i32;
+ } else {
+ access_point.scheme = pb::AddressScheme::IPv6 as i32;
+ }
+
+ let addr = pb::Address {
+ host: socket_addr.ip().to_string(),
+ port: socket_addr.port() as i32,
+ };
+ access_point.addresses.push(addr);
+ }
+
+ Ok(Client {
session_manager: SessionManager::new(logger.new(o!("component" => "session_manager"))),
logger,
route_table: Mutex::new(HashMap::new()),
arn: String::from(""),
id,
- }
- }
-
- fn access_point() -> pb::Endpoints {
- return pb::Endpoints {
- scheme: pb::AddressScheme::IPv4 as i32,
- addresses: vec![pb::Address {
- host: String::from("127.0.0.1"),
- port: 8081,
- }],
- };
- }
-
- fn arn(&self) -> &str {
- &self.arn
+ access_point,
+ })
}
async fn query_route(
@@ -297,9 +309,9 @@ impl Client {
}
}
- let endpoint = "https://127.0.0.1:8081";
let client = Arc::new(*&self);
let client_weak = Arc::downgrade(&client);
+ let endpoint = "https://127.0.0.1:8081";
match self
.session_manager
.route(endpoint, topic, client_weak)
@@ -413,12 +425,14 @@ mod tests {
let mut session = Session::new(endpoint.to_owned(), &logger).await?;
let topic = "cpp_sdk_standard";
+ let client = Client::new(logger.clone(), "localhost:8081")?;
+
let request = pb::QueryRouteRequest {
topic: Some(pb::Resource {
name: topic.to_owned(),
- resource_namespace: String::from(""),
+ resource_namespace: client.arn.clone(),
}),
- endpoints: Some(Client::access_point()),
+ endpoints: Some(client.access_point.clone()),
};
let request = tonic::Request::new(request);
@@ -427,10 +441,11 @@ mod tests {
}
#[tokio::test]
- async fn test_client_query_route() {
- let _endpoint = "http://localhost:8081";
- let client = Client::new(create_logger());
+ async fn test_client_query_route() -> Result<(), error::ClientError> {
+ let access_point = "localhost:8081";
+ let client = Client::new(create_logger(), access_point)?;
let topic = "cpp_sdk_standard";
let _route = client.query_route(topic, true).await.unwrap();
+ Ok(())
}
}
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index ade2d3a..685eb82 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -1,23 +1,24 @@
use slog::Logger;
-use crate::client;
+use crate::{client, error};
struct Producer {
client: client::Client,
}
impl Producer {
- pub async fn new<T>(logger: Logger, topics: T) -> Self
+ pub async fn new<T>(logger: Logger, topics: T) -> Result<Self, error::ClientError>
where
T: IntoIterator,
T::Item: AsRef<str>,
{
- let mut client = client::Client::new(logger);
+ let access_point = "localhost:8081";
+ let client = client::Client::new(logger, access_point)?;
for _topic in topics.into_iter() {
// client.subscribe(topic.as_ref()).await;
}
- Producer { client }
+ Ok(Producer { client })
}
pub fn start(&mut self) {}