You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/01 04:40:39 UTC

[skywalking-rust] branch master updated: Refactor management report and keep alive api. (#53)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d4b379  Refactor management report and keep alive api. (#53)
7d4b379 is described below

commit 7d4b3797a9a3c76047b53bb213a5449a9c07f978
Author: jmjoy <jm...@apache.org>
AuthorDate: Wed Feb 1 12:40:33 2023 +0800

    Refactor management report and keep alive api. (#53)
---
 .github/workflows/codecov.yaml       |   2 +
 examples/simple_management_report.rs |  17 +++---
 src/management/manager.rs            | 101 +++++++++++++++++++++++++++++------
 tests/management.rs                  |  90 ++++++++++++++++++++++++++-----
 4 files changed, 173 insertions(+), 37 deletions(-)

diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml
index 052ecaf..791d418 100644
--- a/.github/workflows/codecov.yaml
+++ b/.github/workflows/codecov.yaml
@@ -38,4 +38,6 @@ jobs:
           toolchain: stable
           override: true
       - uses: actions-rs/tarpaulin@v0.1
+        with:
+          version: 0.22.0
       - uses: codecov/codecov-action@v2.1.0
diff --git a/examples/simple_management_report.rs b/examples/simple_management_report.rs
index 107aa95..71a186a 100644
--- a/examples/simple_management_report.rs
+++ b/examples/simple_management_report.rs
@@ -40,13 +40,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
 
     let manager = Manager::new("service", "instance", reporter);
 
-    // Report instance properties.
-    let mut props = Properties::default();
-    props.insert_os_info();
-    manager.report_properties(props);
-
-    // Keep alive
-    manager.keep_alive(Duration::from_secs(10));
+    // Report instance properties and keep alive.
+    manager.report_and_keep_alive(
+        || {
+            let mut props = Properties::default();
+            props.insert_os_info();
+            props
+        },
+        Duration::from_secs(30),
+        10,
+    );
 
     handle.await?;
 
diff --git a/src/management/manager.rs b/src/management/manager.rs
index 9641c35..67f73c3 100644
--- a/src/management/manager.rs
+++ b/src/management/manager.rs
@@ -64,40 +64,107 @@ impl Manager {
 
     /// Report instance properties.
     pub fn report_properties(&self, properties: Properties) {
-        let props = properties
-            .convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone());
-        self.reporter.report(CollectItem::Instance(Box::new(props)));
+        Self::reporter_report_properties(
+            &self.reporter,
+            self.service_name.clone(),
+            self.instance_name.clone(),
+            properties,
+        );
     }
 
-    /// Do keep alive (heartbeat), with the interval, will be run in background.
-    pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
+    fn reporter_report_properties(
+        reporter: &Arc<DynReport>,
+        service_name: String,
+        instance_name: String,
+        properties: Properties,
+    ) {
+        let props = properties.convert_to_instance_properties(service_name, instance_name);
+        reporter.report(CollectItem::Instance(Box::new(props)));
+    }
+
+    /// Do keep alive once.
+    pub fn keep_alive(&self) {
+        Self::reporter_keep_alive(
+            &self.reporter,
+            self.service_name.clone(),
+            self.instance_name.clone(),
+        );
+    }
+
+    fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
+        reporter.report(CollectItem::Ping(Box::new(
+            crate::skywalking_proto::v3::InstancePingPkg {
+                service: service_name,
+                service_instance: instance_name,
+                layer: Default::default(),
+            },
+        )));
+    }
+
+    /// Continuously report instance properties and keep alive. Run in
+    /// background.
+    ///
+    /// Parameter `heartbeat_period` represents agent heartbeat report period.
+    ///
+    /// Parameter `properties_report_period_factor` represents agent sends the
+    /// instance properties to the backend every `heartbeat_period` *
+    /// `properties_report_period_factor` seconds.
+    pub fn report_and_keep_alive(
+        &self,
+        properties: impl Fn() -> Properties + Send + 'static,
+        heartbeat_period: Duration,
+        properties_report_period_factor: usize,
+    ) -> ReportAndKeepAlive {
         let service_name = self.service_name.clone();
         let instance_name = self.instance_name.clone();
         let reporter = self.reporter.clone();
+
         let handle = spawn(async move {
-            let mut ticker = time::interval(interval);
+            let mut counter = 0;
+
+            let mut ticker = time::interval(heartbeat_period);
             loop {
                 ticker.tick().await;
 
-                reporter.report(CollectItem::Ping(Box::new(
-                    crate::skywalking_proto::v3::InstancePingPkg {
-                        service: service_name.clone(),
-                        service_instance: instance_name.clone(),
-                        layer: Default::default(),
-                    },
-                )));
+                if counter == 0 {
+                    Self::reporter_report_properties(
+                        &reporter,
+                        service_name.clone(),
+                        instance_name.clone(),
+                        properties(),
+                    );
+                } else {
+                    Self::reporter_keep_alive(
+                        &reporter,
+                        service_name.clone(),
+                        instance_name.clone(),
+                    );
+                }
+
+                counter += 1;
+
+                if counter >= properties_report_period_factor {
+                    counter = 0;
+                }
             }
         });
-        KeepAlive { handle }
+        ReportAndKeepAlive { handle }
     }
 }
 
-/// Handle of [Manager::keep_alive].
-pub struct KeepAlive {
+/// Handle of [Manager::report_and_keep_alive].
+pub struct ReportAndKeepAlive {
     handle: JoinHandle<()>,
 }
 
-impl Future for KeepAlive {
+impl ReportAndKeepAlive {
+    /// Get the inner tokio join handle.
+    pub fn handle(&self) -> &JoinHandle<()> {
+        &self.handle
+    }
+}
+
+impl Future for ReportAndKeepAlive {
     type Output = Result<(), JoinError>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
diff --git a/tests/management.rs b/tests/management.rs
index 5b10419..681da8e 100644
--- a/tests/management.rs
+++ b/tests/management.rs
@@ -31,13 +31,23 @@ use tokio::time::sleep;
 async fn management() {
     let reporter = Arc::new(MockReporter::default());
     let manager = Manager::new("service_name", "instance_name", reporter.clone());
-    manager.keep_alive(Duration::from_secs(60));
+    let handling = manager.report_and_keep_alive(
+        || {
+            let mut props = Properties::new();
+            props.insert_os_info();
+            props
+        },
+        Duration::from_millis(100),
+        3,
+    );
 
-    {
-        let mut props = Properties::new();
-        props.insert_os_info();
-        manager.report_properties(props);
+    sleep(Duration::from_secs(1)).await;
+
+    handling.handle().abort();
 
+    sleep(Duration::from_secs(1)).await;
+
+    {
         let actual_props = reporter.pop_ins_props();
         assert_eq!(actual_props.service, "service_name".to_owned());
         assert_eq!(actual_props.service_instance, "instance_name".to_owned());
@@ -56,7 +66,6 @@ async fn management() {
     }
 
     {
-        sleep(Duration::from_secs(1)).await;
         assert_eq!(
             reporter.pop_ping(),
             InstancePingPkg {
@@ -66,25 +75,72 @@ async fn management() {
             }
         );
     }
+
+    {
+        reporter.pop_ping();
+    }
+
+    {
+        reporter.pop_ins_props();
+    }
+
+    {
+        reporter.pop_ping();
+    }
+
+    {
+        reporter.pop_ping();
+    }
 }
 
 fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
     &kvs.iter().find(|kv| kv.key == key).unwrap().value
 }
 
+#[derive(Debug)]
+enum Item {
+    Properties(InstanceProperties),
+    PingPkg(InstancePingPkg),
+}
+
+impl Item {
+    fn unwrap_properties(self) -> InstanceProperties {
+        match self {
+            Item::Properties(props) => props,
+            Item::PingPkg(_) => panic!("isn't properties"),
+        }
+    }
+
+    fn unwrap_ping_pkg(self) -> InstancePingPkg {
+        match self {
+            Item::Properties(_) => panic!("isn't ping pkg"),
+            Item::PingPkg(p) => p,
+        }
+    }
+}
+
 #[derive(Default, Clone)]
 struct MockReporter {
-    props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
-    ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
+    items: Arc<Mutex<LinkedList<Item>>>,
 }
 
 impl MockReporter {
     fn pop_ins_props(&self) -> InstanceProperties {
-        self.props_items.try_lock().unwrap().pop_back().unwrap()
+        self.items
+            .try_lock()
+            .unwrap()
+            .pop_front()
+            .unwrap()
+            .unwrap_properties()
     }
 
     fn pop_ping(&self) -> InstancePingPkg {
-        self.ping_items.try_lock().unwrap().pop_back().unwrap()
+        self.items
+            .try_lock()
+            .unwrap()
+            .pop_front()
+            .unwrap()
+            .unwrap_ping_pkg()
     }
 }
 
@@ -92,12 +148,20 @@ impl Report for MockReporter {
     fn report(&self, item: CollectItem) {
         match item {
             CollectItem::Instance(data) => {
-                self.props_items.try_lock().unwrap().push_back(*data);
+                self.items
+                    .try_lock()
+                    .unwrap()
+                    .push_back(Item::Properties(*data));
             }
             CollectItem::Ping(data) => {
-                self.ping_items.try_lock().unwrap().push_back(*data);
+                self.items
+                    .try_lock()
+                    .unwrap()
+                    .push_back(Item::PingPkg(*data));
+            }
+            _ => {
+                unreachable!("unknown collect item type");
             }
-            _ => {}
         }
     }
 }