You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/04/02 11:20:02 UTC
[rocketmq-client-rust] branch develop updated: WIP:
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-rust.git
The following commit(s) were added to refs/heads/develop by this push:
new b209bb0 WIP:
b209bb0 is described below
commit b209bb0703f1cfdee6ebeee951045ace7d5c4fa8
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Sat Apr 2 11:19:53 2022 +0000
WIP:
---
src/client.rs | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
src/lib.rs | 2 +-
2 files changed, 87 insertions(+), 3 deletions(-)
diff --git a/src/client.rs b/src/client.rs
index 3c1fd6a..813de08 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,14 +1,65 @@
use crate::pb::{
messaging_service_client::MessagingServiceClient, QueryRouteRequest, QueryRouteResponse,
+ SendMessageRequest, SendMessageResponse,
};
use tonic::{
+ metadata::MetadataMap,
transport::{Channel, ClientTlsConfig},
Request, Response,
};
+pub struct Credentials {
+ access_key: String,
+ access_secret: String,
+ session_token: Option<String>,
+}
+
+pub trait CredentialProvider {
+ fn get_credentials(&self) -> Credentials;
+}
+
+pub struct StaticCredentialProvider {
+ access_key: String,
+ access_secret: String,
+}
+
+impl StaticCredentialProvider {
+ pub fn new(access_key: String, access_secret: String) -> Self {
+ Self {
+ access_key,
+ access_secret,
+ }
+ }
+}
+
+impl CredentialProvider for StaticCredentialProvider {
+ fn get_credentials(&self) -> Credentials {
+ Credentials {
+ access_key: self.access_key.clone(),
+ access_secret: self.access_key.clone(),
+ session_token: None,
+ }
+ }
+}
+
+#[derive(Default)]
+struct ClientConfig {
+ region: String,
+ service_name: String,
+ resource_namespace: String,
+ credential_provider: Option<Box<dyn CredentialProvider>>,
+ tenant_id: String,
+ io_timeout: std::time::Duration,
+ long_polling_timeout: std::time::Duration,
+ group: String,
+ client_id: String,
+ tracing: bool,
+}
+
pub struct RpcClient {
stub: MessagingServiceClient<Channel>,
peer_address: String,
+ // client_config: std::rc::Rc<ClientConfig>,
}
impl RpcClient {
@@ -27,19 +78,46 @@ impl RpcClient {
})
}
+ fn add_metadata(meta: &mut MetadataMap) {}
+
pub async fn query_route(
&mut self,
request: QueryRouteRequest,
) -> Result<Response<QueryRouteResponse>, Box<dyn std::error::Error>> {
- let req = Request::new(request);
+ let mut req = Request::new(request);
+ RpcClient::add_metadata(req.metadata_mut());
Ok(self.stub.query_route(req).await?)
}
+
+ pub async fn send_message(
+ &mut self,
+ request: SendMessageRequest,
+ ) -> Result<Response<SendMessageResponse>, Box<dyn std::error::Error>> {
+ let mut req = Request::new(request);
+ Ok(self.stub.send_message(req).await?)
+ }
+}
+
+#[derive(Default)]
+pub struct ClientManager {}
+
+impl ClientManager {
+ pub async fn start(&self) {
+ let mut interval = tokio::time::interval(std::time::Duration::from_millis(100));
+ let handle = tokio::spawn(async move {
+ for i in 0..3 {
+ interval.tick().await;
+ println!("Tick");
+ }
+ });
+ let _result = handle.await;
+ }
}
#[cfg(test)]
mod test {
use super::*;
- use crate::pb::{Resource, Code};
+ use crate::pb::{Code, Resource};
#[tokio::test]
async fn test_connect() {
@@ -79,4 +157,10 @@ mod test {
let route_response = reply.into_inner();
assert_eq!(route_response.status.unwrap().code, Code::Ok as i32);
}
+
+ #[tokio::test]
+ async fn test_periodic_task() {
+ let client_manager = ClientManager::default();
+ client_manager.start().await;
+ }
}
diff --git a/src/lib.rs b/src/lib.rs
index ef0b606..57769fd 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,3 @@
-pub mod pb;
pub mod client;
+pub mod pb;
pub mod server;