You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/01/10 06:53:12 UTC

[GitHub] [skywalking-rust] tisonkun commented on a change in pull request #9: self-contained example & gracefully shutdown reporter

tisonkun commented on a change in pull request #9:
URL: https://github.com/apache/skywalking-rust/pull/9#discussion_r780924381



##########
File path: src/reporter/grpc.rs
##########
@@ -51,23 +53,51 @@ impl Reporter {
     /// use skywalking_rust::reporter::grpc::Reporter;
     ///
     /// #[tokio::main]
-    /// async fn main (){
-    ///     let tx = Reporter::start("localhost:12800").await;
+    /// async fn main () -> Result<(), Box<dyn Error>> {
+    ///     let reporter = Reporter::start("localhost:12800").await;
     ///     let mut context = TracingContext::default("service", "instance");
-    ///     tx.send(context).await;
+    ///     reporter.sender().send(context).await?;
+    ///     reporter.shutdown().await?;
     /// }
     /// ```
-    pub async fn start(address: &str) -> ContextReporter {
+    pub async fn start(address: impl Into<String>) -> Self {
         let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
             mpsc::channel(CHANNEL_BUF_SIZE);
-        let mut reporter = ReporterClient::connect(address.to_string()).await.unwrap();
+        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
+
+        let mut reporter = ReporterClient::connect(address.into()).await.unwrap();
         tokio::spawn(async move {
+            loop {
+                tokio::select! {
+                    message = rx.recv() => {
+                        if let Some(message) = message {
+                            flush(&mut reporter, message.convert_segment_object()).await.unwrap();
+                        } else {
+                            break;
+                        }
+                    },
+                    _ = shutdown_rx.recv() => {
+                        break;
+                    }
+                }
+            }
+            rx.close();
             while let Some(message) = rx.recv().await {

Review comment:
       No. If a select chooses shutdown first, there can be outstanding messages to be processed. We close the `rx` so that there is no more inflight message, but the outstanding ones should be processed.
   
   See also https://docs.rs/tokio/1.15.0/tokio/sync/mpsc/struct.Receiver.html#method.close for example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org