You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/10 05:15:55 UTC

[GitHub] [skywalking-rust] Shikugawa commented on a change in pull request #9: self-contained example & gracefully shutdown reporter

Shikugawa commented on a change in pull request #9:
URL: https://github.com/apache/skywalking-rust/pull/9#discussion_r780896095



##########
File path: examples/simple_trace_report.rs
##########
@@ -1,14 +1,67 @@
+use std::error::Error;

Review comment:
       Could we add new file which includes self-contained example?

##########
File path: src/reporter/grpc.rs
##########
@@ -51,23 +53,51 @@ impl Reporter {
     /// use skywalking_rust::reporter::grpc::Reporter;
     ///
     /// #[tokio::main]
-    /// async fn main (){
-    ///     let tx = Reporter::start("localhost:12800").await;
+    /// async fn main () -> Result<(), Box<dyn Error>> {
+    ///     let reporter = Reporter::start("localhost:12800").await;
     ///     let mut context = TracingContext::default("service", "instance");
-    ///     tx.send(context).await;
+    ///     reporter.sender().send(context).await?;
+    ///     reporter.shutdown().await?;
     /// }
     /// ```
-    pub async fn start(address: &str) -> ContextReporter {
+    pub async fn start(address: impl Into<String>) -> Self {
         let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
             mpsc::channel(CHANNEL_BUF_SIZE);
-        let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap();
+        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
+
+        let mut reporter = ReporterClient::connect(address.into()).await.unwrap();
         tokio::spawn(async move {
+            loop {
+                tokio::select! {
+                    message = rx.recv() => {
+                        if let Some(message) = message {
+                            flush(&mut reporter, message.convert_segment_object()).await.unwrap();
+                        } else {
+                            break;
+                        }
+                    },
+                    _ = shutdown_rx.recv() => {
+                        break;
+                    }
+                }
+            }
+            rx.close();
             while let Some(message) = rx.recv().await {

Review comment:
       It may not be required anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org