You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/04/02 11:20:02 UTC

[rocketmq-client-rust] branch develop updated: WIP:

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-rust.git


The following commit(s) were added to refs/heads/develop by this push:
     new b209bb0  WIP:
b209bb0 is described below

commit b209bb0703f1cfdee6ebeee951045ace7d5c4fa8
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Sat Apr 2 11:19:53 2022 +0000

    WIP:
---
 src/client.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 src/lib.rs    |  2 +-
 2 files changed, 87 insertions(+), 3 deletions(-)

diff --git a/src/client.rs b/src/client.rs
index 3c1fd6a..813de08 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,14 +1,65 @@
 use crate::pb::{
     messaging_service_client::MessagingServiceClient, QueryRouteRequest, QueryRouteResponse,
+    SendMessageRequest, SendMessageResponse,
 };
 use tonic::{
+    metadata::MetadataMap,
     transport::{Channel, ClientTlsConfig},
     Request, Response,
 };
 
+pub struct Credentials {
+    access_key: String,
+    access_secret: String,
+    session_token: Option<String>,
+}
+
+pub trait CredentialProvider {
+    fn get_credentials(&self) -> Credentials;
+}
+
+pub struct StaticCredentialProvider {
+    access_key: String,
+    access_secret: String,
+}
+
+impl StaticCredentialProvider {
+    pub fn new(access_key: String, access_secret: String) -> Self {
+        Self {
+            access_key,
+            access_secret,
+        }
+    }
+}
+
+impl CredentialProvider for StaticCredentialProvider {
+    fn get_credentials(&self) -> Credentials {
+        Credentials {
+            access_key: self.access_key.clone(),
+            access_secret: self.access_key.clone(),
+            session_token: None,
+        }
+    }
+}
+
+#[derive(Default)]
+struct ClientConfig {
+    region: String,
+    service_name: String,
+    resource_namespace: String,
+    credential_provider: Option<Box<dyn CredentialProvider>>,
+    tenant_id: String,
+    io_timeout: std::time::Duration,
+    long_polling_timeout: std::time::Duration,
+    group: String,
+    client_id: String,
+    tracing: bool,
+}
+
 pub struct RpcClient {
     stub: MessagingServiceClient<Channel>,
     peer_address: String,
+    // client_config: std::rc::Rc<ClientConfig>,
 }
 
 impl RpcClient {
@@ -27,19 +78,46 @@ impl RpcClient {
         })
     }
 
+    fn add_metadata(meta: &mut MetadataMap) {}
+
     pub async fn query_route(
         &mut self,
         request: QueryRouteRequest,
     ) -> Result<Response<QueryRouteResponse>, Box<dyn std::error::Error>> {
-        let req = Request::new(request);
+        let mut req = Request::new(request);
+        RpcClient::add_metadata(req.metadata_mut());
         Ok(self.stub.query_route(req).await?)
     }
+
+    pub async fn send_message(
+        &mut self,
+        request: SendMessageRequest,
+    ) -> Result<Response<SendMessageResponse>, Box<dyn std::error::Error>> {
+        let mut req = Request::new(request);
+        Ok(self.stub.send_message(req).await?)
+    }
+}
+
+#[derive(Default)]
+pub struct ClientManager {}
+
+impl ClientManager {
+    pub async fn start(&self) {
+        let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
+        let handle = tokio::spawn(async move {
+            for i in 0..3 {
+                interval.tick().await;
+                println!("Tick");
+            }
+        });
+        let _result = handle.await;
+    }
 }
 
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::pb::{Resource, Code};
+    use crate::pb::{Code, Resource};
 
     #[tokio::test]
     async fn test_connect() {
@@ -79,4 +157,10 @@ mod test {
         let route_response = reply.into_inner();
         assert_eq!(route_response.status.unwrap().code, Code::Ok as i32);
     }
+
+    #[tokio::test]
+    async fn test_periodic_task() {
+        let client_manager = ClientManager::default();
+        client_manager.start().await;
+    }
 }
diff --git a/src/lib.rs b/src/lib.rs
index ef0b606..57769fd 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,3 @@
-pub mod pb;
 pub mod client;
+pub mod pb;
 pub mod server;