You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/13 11:47:02 UTC

[rocketmq-clients] branch rust_dev updated: WIP

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch rust_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/rust_dev by this push:
     new 8fea2c0  WIP
8fea2c0 is described below

commit 8fea2c037522c25a1e541f7a56cb9261293369f7
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Jul 13 19:46:47 2022 +0800

    WIP
---
 rust/src/client.rs   | 67 ++++++++++++++++++++++++++++++++--------------------
 rust/src/producer.rs |  9 +++----
 2 files changed, 46 insertions(+), 30 deletions(-)

diff --git a/rust/src/client.rs b/rust/src/client.rs
index 9760fcc..948f88a 100644
--- a/rust/src/client.rs
+++ b/rust/src/client.rs
@@ -156,13 +156,12 @@ impl SessionManager {
             }
         };
 
-        let access_point = Client::access_point();
         let request = QueryRouteRequest {
             topic: Some(Resource {
                 name: topic.to_owned(),
-                resource_namespace: client.arn().to_owned(),
+                resource_namespace: client.arn.clone(),
             }),
-            endpoints: Some(access_point),
+            endpoints: Some(client.access_point.clone()),
         };
 
         let mut request = tonic::Request::new(request);
@@ -208,6 +207,7 @@ pub(crate) struct Client {
     route_table: Mutex<HashMap<String /* topic */, RouteStatus>>,
     arn: String,
     id: String,
+    access_point: pb::Endpoints,
 }
 
 static CLIENT_ID_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
@@ -232,29 +232,41 @@ impl Client {
         )
     }
 
-    pub(crate) fn new(logger: Logger) -> Self {
+    pub(crate) fn new(
+        logger: Logger,
+        access_url: impl std::net::ToSocketAddrs,
+    ) -> Result<Self, error::ClientError> {
         let id = Self::client_id();
-        Client {
+        let mut access_point = pb::Endpoints {
+            scheme: pb::AddressScheme::IPv4 as i32,
+            addresses: vec![],
+        };
+
+        for socket_addr in access_url
+            .to_socket_addrs()
+            .map_err(|e| error::ClientError::ClientInternal)?
+        {
+            if socket_addr.is_ipv4() {
+                access_point.scheme = pb::AddressScheme::IPv4 as i32;
+            } else {
+                access_point.scheme = pb::AddressScheme::IPv6 as i32;
+            }
+
+            let addr = pb::Address {
+                host: socket_addr.ip().to_string(),
+                port: socket_addr.port() as i32,
+            };
+            access_point.addresses.push(addr);
+        }
+
+        Ok(Client {
             session_manager: SessionManager::new(logger.new(o!("component" => "session_manager"))),
             logger,
             route_table: Mutex::new(HashMap::new()),
             arn: String::from(""),
             id,
-        }
-    }
-
-    fn access_point() -> pb::Endpoints {
-        return pb::Endpoints {
-            scheme: pb::AddressScheme::IPv4 as i32,
-            addresses: vec![pb::Address {
-                host: String::from("127.0.0.1"),
-                port: 8081,
-            }],
-        };
-    }
-
-    fn arn(&self) -> &str {
-        &self.arn
+            access_point,
+        })
     }
 
     async fn query_route(
@@ -297,9 +309,9 @@ impl Client {
             }
         }
 
-        let endpoint = "https://127.0.0.1:8081";
         let client = Arc::new(*&self);
         let client_weak = Arc::downgrade(&client);
+        let endpoint = "https://127.0.0.1:8081";
         match self
             .session_manager
             .route(endpoint, topic, client_weak)
@@ -413,12 +425,14 @@ mod tests {
         let mut session = Session::new(endpoint.to_owned(), &logger).await?;
         let topic = "cpp_sdk_standard";
 
+        let client = Client::new(logger.clone(), "localhost:8081")?;
+
         let request = pb::QueryRouteRequest {
             topic: Some(pb::Resource {
                 name: topic.to_owned(),
-                resource_namespace: String::from(""),
+                resource_namespace: client.arn.clone(),
             }),
-            endpoints: Some(Client::access_point()),
+            endpoints: Some(client.access_point.clone()),
         };
 
         let request = tonic::Request::new(request);
@@ -427,10 +441,11 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_client_query_route() {
-        let _endpoint = "http://localhost:8081";
-        let client = Client::new(create_logger());
+    async fn test_client_query_route() -> Result<(), error::ClientError> {
+        let access_point = "localhost:8081";
+        let client = Client::new(create_logger(), access_point)?;
         let topic = "cpp_sdk_standard";
         let _route = client.query_route(topic, true).await.unwrap();
+        Ok(())
     }
 }
diff --git a/rust/src/producer.rs b/rust/src/producer.rs
index ade2d3a..685eb82 100644
--- a/rust/src/producer.rs
+++ b/rust/src/producer.rs
@@ -1,23 +1,24 @@
 use slog::Logger;
 
-use crate::client;
+use crate::{client, error};
 
 struct Producer {
     client: client::Client,
 }
 
 impl Producer {
-    pub async fn new<T>(logger: Logger, topics: T) -> Self
+    pub async fn new<T>(logger: Logger, topics: T) -> Result<Self, error::ClientError>
     where
         T: IntoIterator,
         T::Item: AsRef<str>,
     {
-        let mut client = client::Client::new(logger);
+        let access_point = "localhost:8081";
+        let client = client::Client::new(logger, access_point)?;
         for _topic in topics.into_iter() {
             // client.subscribe(topic.as_ref()).await;
         }
 
-        Producer { client }
+        Ok(Producer { client })
     }
 
     pub fn start(&mut self) {}