You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/01 04:40:39 UTC
[skywalking-rust] branch master updated: Refactor management report and keep alive api. (#53)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new 7d4b379 Refactor management report and keep alive api. (#53)
7d4b379 is described below
commit 7d4b3797a9a3c76047b53bb213a5449a9c07f978
Author: jmjoy <jm...@apache.org>
AuthorDate: Wed Feb 1 12:40:33 2023 +0800
Refactor management report and keep alive api. (#53)
---
.github/workflows/codecov.yaml | 2 +
examples/simple_management_report.rs | 17 +++---
src/management/manager.rs | 101 +++++++++++++++++++++++++++++------
tests/management.rs | 90 ++++++++++++++++++++++++++-----
4 files changed, 173 insertions(+), 37 deletions(-)
diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml
index 052ecaf..791d418 100644
--- a/.github/workflows/codecov.yaml
+++ b/.github/workflows/codecov.yaml
@@ -38,4 +38,6 @@ jobs:
toolchain: stable
override: true
- uses: actions-rs/tarpaulin@v0.1
+ with:
+ version: 0.22.0
- uses: codecov/codecov-action@v2.1.0
diff --git a/examples/simple_management_report.rs b/examples/simple_management_report.rs
index 107aa95..71a186a 100644
--- a/examples/simple_management_report.rs
+++ b/examples/simple_management_report.rs
@@ -40,13 +40,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
let manager = Manager::new("service", "instance", reporter);
- // Report instance properties.
- let mut props = Properties::default();
- props.insert_os_info();
- manager.report_properties(props);
-
- // Keep alive
- manager.keep_alive(Duration::from_secs(10));
+ // Report instance properties and keep alive.
+ manager.report_and_keep_alive(
+ || {
+ let mut props = Properties::default();
+ props.insert_os_info();
+ props
+ },
+ Duration::from_secs(30),
+ 10,
+ );
handle.await?;
diff --git a/src/management/manager.rs b/src/management/manager.rs
index 9641c35..67f73c3 100644
--- a/src/management/manager.rs
+++ b/src/management/manager.rs
@@ -64,40 +64,107 @@ impl Manager {
/// Report instance properties.
pub fn report_properties(&self, properties: Properties) {
- let props = properties
- .convert_to_instance_properties(self.service_name.clone(), self.instance_name.clone());
- self.reporter.report(CollectItem::Instance(Box::new(props)));
+ Self::reporter_report_properties(
+ &self.reporter,
+ self.service_name.clone(),
+ self.instance_name.clone(),
+ properties,
+ );
}
- /// Do keep alive (heartbeat), with the interval, will be run in background.
- pub fn keep_alive(&self, interval: Duration) -> KeepAlive {
+ fn reporter_report_properties(
+ reporter: &Arc<DynReport>,
+ service_name: String,
+ instance_name: String,
+ properties: Properties,
+ ) {
+ let props = properties.convert_to_instance_properties(service_name, instance_name);
+ reporter.report(CollectItem::Instance(Box::new(props)));
+ }
+
+ /// Do keep alive once.
+ pub fn keep_alive(&self) {
+ Self::reporter_keep_alive(
+ &self.reporter,
+ self.service_name.clone(),
+ self.instance_name.clone(),
+ );
+ }
+
+ fn reporter_keep_alive(reporter: &Arc<DynReport>, service_name: String, instance_name: String) {
+ reporter.report(CollectItem::Ping(Box::new(
+ crate::skywalking_proto::v3::InstancePingPkg {
+ service: service_name,
+ service_instance: instance_name,
+ layer: Default::default(),
+ },
+ )));
+ }
+
+ /// Continuously report instance properties and keep alive. Run in
+ /// background.
+ ///
+ /// Parameter `heartbeat_period` represents agent heartbeat report period.
+ ///
+ /// Parameter `properties_report_period_factor` represents agent sends the
+ /// instance properties to the backend every `heartbeat_period` *
+ /// `properties_report_period_factor` seconds.
+ pub fn report_and_keep_alive(
+ &self,
+ properties: impl Fn() -> Properties + Send + 'static,
+ heartbeat_period: Duration,
+ properties_report_period_factor: usize,
+ ) -> ReportAndKeepAlive {
let service_name = self.service_name.clone();
let instance_name = self.instance_name.clone();
let reporter = self.reporter.clone();
+
let handle = spawn(async move {
- let mut ticker = time::interval(interval);
+ let mut counter = 0;
+
+ let mut ticker = time::interval(heartbeat_period);
loop {
ticker.tick().await;
- reporter.report(CollectItem::Ping(Box::new(
- crate::skywalking_proto::v3::InstancePingPkg {
- service: service_name.clone(),
- service_instance: instance_name.clone(),
- layer: Default::default(),
- },
- )));
+ if counter == 0 {
+ Self::reporter_report_properties(
+ &reporter,
+ service_name.clone(),
+ instance_name.clone(),
+ properties(),
+ );
+ } else {
+ Self::reporter_keep_alive(
+ &reporter,
+ service_name.clone(),
+ instance_name.clone(),
+ );
+ }
+
+ counter += 1;
+
+ if counter >= properties_report_period_factor {
+ counter = 0;
+ }
}
});
- KeepAlive { handle }
+ ReportAndKeepAlive { handle }
}
}
-/// Handle of [Manager::keep_alive].
-pub struct KeepAlive {
+/// Handle of [Manager::report_and_keep_alive].
+pub struct ReportAndKeepAlive {
handle: JoinHandle<()>,
}
-impl Future for KeepAlive {
+impl ReportAndKeepAlive {
+ /// Get the inner tokio join handle.
+ pub fn handle(&self) -> &JoinHandle<()> {
+ &self.handle
+ }
+}
+
+impl Future for ReportAndKeepAlive {
type Output = Result<(), JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
diff --git a/tests/management.rs b/tests/management.rs
index 5b10419..681da8e 100644
--- a/tests/management.rs
+++ b/tests/management.rs
@@ -31,13 +31,23 @@ use tokio::time::sleep;
async fn management() {
let reporter = Arc::new(MockReporter::default());
let manager = Manager::new("service_name", "instance_name", reporter.clone());
- manager.keep_alive(Duration::from_secs(60));
+ let handling = manager.report_and_keep_alive(
+ || {
+ let mut props = Properties::new();
+ props.insert_os_info();
+ props
+ },
+ Duration::from_millis(100),
+ 3,
+ );
- {
- let mut props = Properties::new();
- props.insert_os_info();
- manager.report_properties(props);
+ sleep(Duration::from_secs(1)).await;
+
+ handling.handle().abort();
+ sleep(Duration::from_secs(1)).await;
+
+ {
let actual_props = reporter.pop_ins_props();
assert_eq!(actual_props.service, "service_name".to_owned());
assert_eq!(actual_props.service_instance, "instance_name".to_owned());
@@ -56,7 +66,6 @@ async fn management() {
}
{
- sleep(Duration::from_secs(1)).await;
assert_eq!(
reporter.pop_ping(),
InstancePingPkg {
@@ -66,25 +75,72 @@ async fn management() {
}
);
}
+
+ {
+ reporter.pop_ping();
+ }
+
+ {
+ reporter.pop_ins_props();
+ }
+
+ {
+ reporter.pop_ping();
+ }
+
+ {
+ reporter.pop_ping();
+ }
}
fn kvs_get_value<'a>(kvs: &'a [KeyStringValuePair], key: &str) -> &'a str {
&kvs.iter().find(|kv| kv.key == key).unwrap().value
}
+#[derive(Debug)]
+enum Item {
+ Properties(InstanceProperties),
+ PingPkg(InstancePingPkg),
+}
+
+impl Item {
+ fn unwrap_properties(self) -> InstanceProperties {
+ match self {
+ Item::Properties(props) => props,
+ Item::PingPkg(_) => panic!("isn't properties"),
+ }
+ }
+
+ fn unwrap_ping_pkg(self) -> InstancePingPkg {
+ match self {
+ Item::Properties(_) => panic!("isn't ping pkg"),
+ Item::PingPkg(p) => p,
+ }
+ }
+}
+
#[derive(Default, Clone)]
struct MockReporter {
- props_items: Arc<Mutex<LinkedList<InstanceProperties>>>,
- ping_items: Arc<Mutex<LinkedList<InstancePingPkg>>>,
+ items: Arc<Mutex<LinkedList<Item>>>,
}
impl MockReporter {
fn pop_ins_props(&self) -> InstanceProperties {
- self.props_items.try_lock().unwrap().pop_back().unwrap()
+ self.items
+ .try_lock()
+ .unwrap()
+ .pop_front()
+ .unwrap()
+ .unwrap_properties()
}
fn pop_ping(&self) -> InstancePingPkg {
- self.ping_items.try_lock().unwrap().pop_back().unwrap()
+ self.items
+ .try_lock()
+ .unwrap()
+ .pop_front()
+ .unwrap()
+ .unwrap_ping_pkg()
}
}
@@ -92,12 +148,20 @@ impl Report for MockReporter {
fn report(&self, item: CollectItem) {
match item {
CollectItem::Instance(data) => {
- self.props_items.try_lock().unwrap().push_back(*data);
+ self.items
+ .try_lock()
+ .unwrap()
+ .push_back(Item::Properties(*data));
}
CollectItem::Ping(data) => {
- self.ping_items.try_lock().unwrap().push_back(*data);
+ self.items
+ .try_lock()
+ .unwrap()
+ .push_back(Item::PingPkg(*data));
+ }
+ _ => {
+ unreachable!("unknown collect item type");
}
- _ => {}
}
}
}