You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/15 05:38:28 UTC

[skywalking-rust] branch master updated: Use stream and completed for a bulk to collect for grpc reporter. (#54)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new c39722c  Use stream and completed for a bulk to collect for grpc reporter. (#54)
c39722c is described below

commit c39722c870bb79ff159ce441d61b6d564a71c0d0
Author: jmjoy <jm...@apache.org>
AuthorDate: Wed Feb 15 13:38:22 2023 +0800

    Use stream and completed for a bulk to collect for grpc reporter. (#54)
---
 Cargo.toml                    |   2 +-
 examples/simple_log_report.rs |   2 +-
 src/reporter/grpc.rs          | 405 ++++++++++++++++++++++++++++--------------
 3 files changed, 277 insertions(+), 132 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5b66f22..423e8cf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,7 +40,6 @@ vendored = ["protobuf-src"]
 mock = []  # For internal integration testing only, do not use.
 
 [dependencies]
-async-stream = "0.3.3"
 base64 = "0.13.0"
 bytes = "1.2.1"
 cfg-if = "1.0.0"
@@ -56,6 +55,7 @@ serde = { version = "1.0.143", features = ["derive"] }
 systemstat = { version = "0.2.0", optional = true }
 thiserror = "1.0.32"
 tokio = { version = "1.20.1", features = ["parking_lot"] }
+tokio-stream = { version = "0.1.9", features = ["time"] }
 tonic = { version = "0.8.0", features = ["codegen"] }
 tracing = "0.1.36"
 uuid = { version = "1.1.2", features = ["serde", "v4"] }
diff --git a/examples/simple_log_report.rs b/examples/simple_log_report.rs
index 0466e9a..02763e9 100644
--- a/examples/simple_log_report.rs
+++ b/examples/simple_log_report.rs
@@ -37,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
         .reporting()
         .await
         .with_graceful_shutdown(future::ready(()))
-        .start()
+        .spawn()
         .await?;
 
     Ok(())
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 68307ee..576adb7 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -27,25 +27,29 @@ use crate::{
         SegmentObject,
     },
 };
-use futures_util::stream;
+use futures_core::Stream;
+use futures_util::future::{try_join_all, TryJoinAll};
 use std::{
-    collections::LinkedList,
     error::Error,
     future::{pending, Future},
-    mem::take,
     pin::Pin,
     sync::{
         atomic::{AtomicBool, Ordering},
         Arc,
     },
     task::{Context, Poll},
+    time::Duration,
 };
 use tokio::{
     select,
-    sync::{mpsc, Mutex},
+    sync::{
+        mpsc::{self, Receiver, Sender},
+        Mutex,
+    },
     task::JoinHandle,
     try_join,
 };
+use tokio_stream::StreamExt;
 use tonic::{
     async_trait,
     metadata::{Ascii, MetadataValue},
@@ -53,6 +57,19 @@ use tonic::{
     transport::{self, Channel, Endpoint},
     Request, Status,
 };
+use tracing::error;
+
+type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
+type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
+type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;
+
+fn default_err_handle(message: &str, err: &dyn Error) {
+    error!(?err, "{}", message);
+}
+
+fn default_status_handle(message: &str, status: &Status) {
+    error!(?status, "{}", message);
+}
 
 /// Special purpose, used for user-defined production operations. Generally, it
 /// does not need to be handled.
@@ -114,8 +131,6 @@ impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
     }
 }
 
-type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
-
 #[derive(Default, Clone)]
 struct CustomInterceptor {
     authentication: Option<Arc<String>>,
@@ -138,20 +153,23 @@ impl Interceptor for CustomInterceptor {
     }
 }
 
-struct Inner<P, C> {
-    producer: P,
-    consumer: Mutex<Option<C>>,
+struct State {
     is_reporting: AtomicBool,
-    is_closed: AtomicBool,
+    is_closing: AtomicBool,
 }
 
-/// Alias of dyn [Error] callback.
-pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
+impl State {
+    fn is_closing(&self) -> bool {
+        self.is_closing.load(Ordering::Relaxed)
+    }
+}
 
 /// Reporter which will report to Skywalking OAP server via grpc protocol.
 pub struct GrpcReporter<P, C> {
-    inner: Arc<Inner<P, C>>,
-    err_handle: Arc<Option<Box<DynErrHandle>>>,
+    state: Arc<State>,
+    producer: Arc<P>,
+    consumer: Arc<Mutex<Option<C>>>,
+    err_handle: Arc<DynErrHandler>,
     channel: Channel,
     interceptor: CustomInterceptor,
 }
@@ -178,13 +196,13 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
     /// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
     pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
         Self {
-            inner: Arc::new(Inner {
-                producer,
-                consumer: Mutex::new(Some(consumer)),
+            state: Arc::new(State {
                 is_reporting: Default::default(),
-                is_closed: Default::default(),
+                is_closing: Default::default(),
             }),
-            err_handle: Default::default(),
+            producer: Arc::new(producer),
+            consumer: Arc::new(Mutex::new(Some(consumer))),
+            err_handle: Arc::new(default_err_handle),
             channel,
             interceptor: Default::default(),
         }
@@ -193,9 +211,9 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
     /// Set error handle. By default, the error will not be handle.
     pub fn with_err_handle(
         mut self,
-        handle: impl Fn(Box<dyn Error>) + Send + Sync + 'static,
+        handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
     ) -> Self {
-        self.err_handle = Arc::new(Some(Box::new(handle)));
+        self.err_handle = Arc::new(handle);
         self
     }
 
@@ -220,40 +238,63 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
     /// # Panics
     ///
     /// Panic if call more than once.
-    pub async fn reporting(&self) -> Reporting<P, C> {
-        if self.inner.is_reporting.swap(true, Ordering::Relaxed) {
+    pub async fn reporting(&self) -> Reporting<C> {
+        if self.state.is_reporting.swap(true, Ordering::Relaxed) {
             panic!("reporting already called");
         }
 
-        Reporting {
-            rb: ReporterAndBuffer {
-                inner: Arc::clone(&self.inner),
-                status_handle: None,
+        let (trace_sender, trace_receiver) = mpsc::channel(255);
+        let (log_sender, log_receiver) = mpsc::channel(255);
+        let (meter_sender, meter_receiver) = mpsc::channel(255);
 
-                trace_buffer: Default::default(),
-                log_buffer: Default::default(),
-                meter_buffer: Default::default(),
+        let status_handle = Arc::new(default_status_handle);
 
+        Reporting {
+            report_sender: ReportSender {
+                state: Arc::clone(&self.state),
+                inner_report_sender: InnerReportSender {
+                    status_handle: Arc::new(default_status_handle),
+                    err_handle: self.err_handle.clone(),
+                    trace_sender,
+                    log_sender,
+                    meter_sender,
+
+                    #[cfg(feature = "management")]
+                    management_client: ManagementServiceClient::with_interceptor(
+                        self.channel.clone(),
+                        self.interceptor.clone(),
+                    ),
+                },
+                shutdown_signal: Box::pin(pending()),
+                consumer: self.consumer.lock().await.take().unwrap(),
+            },
+
+            trace_receive_reporter: TraceReceiveReporter {
                 trace_client: TraceSegmentReportServiceClient::with_interceptor(
                     self.channel.clone(),
                     self.interceptor.clone(),
                 ),
+                trace_receiver,
+                status_handle: status_handle.clone(),
+            },
+
+            log_receive_reporter: LogReceiveReporter {
                 log_client: LogReportServiceClient::with_interceptor(
                     self.channel.clone(),
                     self.interceptor.clone(),
                 ),
+                log_receiver,
+                status_handle: status_handle.clone(),
+            },
+
+            meter_receive_reporter: MeterReceiveReporter {
                 meter_client: MeterReportServiceClient::with_interceptor(
                     self.channel.clone(),
                     self.interceptor.clone(),
                 ),
-                #[cfg(feature = "management")]
-                management_client: ManagementServiceClient::with_interceptor(
-                    self.channel.clone(),
-                    self.interceptor.clone(),
-                ),
+                meter_receiver,
+                status_handle,
             },
-            shutdown_signal: Box::pin(pending()),
-            consumer: self.inner.consumer.lock().await.take().unwrap(),
         }
     }
 }
@@ -261,7 +302,9 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
 impl<P, C> Clone for GrpcReporter<P, C> {
     fn clone(&self) -> Self {
         Self {
-            inner: self.inner.clone(),
+            state: self.state.clone(),
+            producer: self.producer.clone(),
+            consumer: self.consumer.clone(),
             err_handle: self.err_handle.clone(),
             channel: self.channel.clone(),
             interceptor: self.interceptor.clone(),
@@ -271,44 +314,43 @@ impl<P, C> Clone for GrpcReporter<P, C> {
 
 impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
     fn report(&self, item: CollectItem) {
-        if !self.inner.is_closed.load(Ordering::Relaxed) {
-            if let Err(e) = self.inner.producer.produce(item) {
-                if let Some(handle) = self.err_handle.as_deref() {
-                    handle(e);
-                }
+        if !self.state.is_closing() {
+            if let Err(e) = self.producer.produce(item) {
+                (self.err_handle)("report collect item failed", &*e);
             }
         }
     }
 }
 
-struct ReporterAndBuffer<P, C> {
-    inner: Arc<Inner<P, C>>,
-    status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+struct InnerReportSender {
+    status_handle: Arc<DynStatusHandler>,
+    err_handle: Arc<DynErrHandler>,
 
-    trace_buffer: LinkedList<SegmentObject>,
-    log_buffer: LinkedList<LogData>,
-    meter_buffer: LinkedList<MeterData>,
+    trace_sender: Sender<SegmentObject>,
+    log_sender: Sender<LogData>,
+    meter_sender: Sender<MeterData>,
 
-    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
-    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
-    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
     #[cfg(feature = "management")]
-    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
     management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
 }
 
-impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
+impl InnerReportSender {
     async fn report(&mut self, item: CollectItem) {
-        // TODO Implement batch collect in future.
         match item {
             CollectItem::Trace(item) => {
-                self.trace_buffer.push_back(*item);
+                if let Err(e) = self.trace_sender.try_send(*item) {
+                    (self.err_handle)("report trace segment failed", &e as &dyn Error);
+                }
             }
             CollectItem::Log(item) => {
-                self.log_buffer.push_back(*item);
+                if let Err(e) = self.log_sender.try_send(*item) {
+                    (self.err_handle)("report log data failed", &e as &dyn Error);
+                }
             }
             CollectItem::Meter(item) => {
-                self.meter_buffer.push_back(*item);
+                if let Err(e) = self.meter_sender.try_send(*item) {
+                    (self.err_handle)("report meter data failed", &e as &dyn Error);
+                }
             }
             #[cfg(feature = "management")]
             CollectItem::Instance(item) => {
@@ -317,97 +359,44 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
                     .report_instance_properties(*item)
                     .await
                 {
-                    if let Some(status_handle) = &self.status_handle {
-                        status_handle(e);
-                    }
+                    (self.status_handle)("Report instance properties failed", &e);
                 }
             }
             #[cfg(feature = "management")]
             CollectItem::Ping(item) => {
                 if let Err(e) = self.management_client.keep_alive(*item).await {
-                    if let Some(status_handle) = &self.status_handle {
-                        status_handle(e);
-                    }
-                }
-            }
-        }
-
-        if !self.trace_buffer.is_empty() {
-            let buffer = take(&mut self.trace_buffer);
-            if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
-                if let Some(status_handle) = &self.status_handle {
-                    status_handle(e);
-                }
-            }
-        }
-        if !self.log_buffer.is_empty() {
-            let buffer = take(&mut self.log_buffer);
-            if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
-                if let Some(status_handle) = &self.status_handle {
-                    status_handle(e);
-                }
-            }
-        }
-
-        if !self.meter_buffer.is_empty() {
-            let buffer = take(&mut self.meter_buffer);
-            if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
-                if let Some(status_handle) = &self.status_handle {
-                    status_handle(e);
+                    (self.status_handle)("Ping failed", &e);
                 }
             }
         }
     }
 }
 
-/// Handle of [GrpcReporter::reporting].
-pub struct Reporting<P, C> {
-    rb: ReporterAndBuffer<P, C>,
+struct ReportSender<C> {
+    state: Arc<State>,
+    inner_report_sender: InnerReportSender,
     consumer: C,
     shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
 }
 
-impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
-    /// Quit when shutdown_signal received.
-    ///
-    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
-    pub fn with_graceful_shutdown(
-        mut self,
-        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
-    ) -> Self {
-        self.shutdown_signal = Box::pin(shutdown_signal);
-        self
-    }
-
-    /// Set the failed status handle. By default, the status will not be handle.
-    pub fn with_status_handle(mut self, handle: impl Fn(tonic::Status) + Send + 'static) -> Self {
-        self.rb.status_handle = Some(Box::new(handle));
-        self
-    }
-
-    /// Spawn the reporting in background.
-    pub fn spawn(self) -> ReportingJoinHandle {
-        ReportingJoinHandle {
-            handle: tokio::spawn(self.start()),
-        }
-    }
-
-    /// Start the consume and report task.
-    pub async fn start(self) -> crate::Result<()> {
+impl<C: CollectItemConsume> ReportSender<C> {
+    async fn start(self) -> crate::Result<()> {
         let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
-        let Reporting {
-            mut rb,
-            mut consumer,
+        let ReportSender {
+            state,
+            mut inner_report_sender,
+            consumer: mut collect_item_consumer,
             shutdown_signal,
+            ..
         } = self;
 
         let work_fut = async move {
             loop {
                 select! {
-                    item = consumer.consume() => {
+                    item = collect_item_consumer.consume() => {
                         match item {
                             Ok(Some(item)) => {
-                                rb.report(item).await;
+                                inner_report_sender.report(item).await;
                             }
                             Ok(None) => break,
                             Err(err) => return Err(crate::Error::Other(err)),
@@ -417,13 +406,13 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
                 }
             }
 
-            rb.inner.is_closed.store(true, Ordering::Relaxed);
+            state.is_closing.store(true, Ordering::Relaxed);
 
             // Flush.
             loop {
-                match consumer.try_consume().await {
+                match collect_item_consumer.try_consume().await {
                     Ok(Some(item)) => {
-                        rb.report(item).await;
+                        inner_report_sender.report(item).await;
                     }
                     Ok(None) => break,
                     Err(err) => return Err(err.into()),
@@ -447,15 +436,171 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
     }
 }
 
+/// Handle of [GrpcReporter::reporting].
+pub struct Reporting<C> {
+    report_sender: ReportSender<C>,
+    trace_receive_reporter: TraceReceiveReporter,
+    log_receive_reporter: LogReceiveReporter,
+    meter_receive_reporter: MeterReceiveReporter,
+}
+
+impl<C: CollectItemConsume> Reporting<C> {
+    /// Quit when shutdown_signal received.
+    ///
+    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+    pub fn with_graceful_shutdown(
+        mut self,
+        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+    ) -> Self {
+        self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
+        self
+    }
+
+    /// Set the failed status handle. By default, the status will not be handle.
+    pub fn with_status_handle(
+        mut self,
+        handle: impl Fn(&str, &Status) + Send + Sync + 'static,
+    ) -> Self {
+        let handle = Arc::new(handle);
+        self.report_sender.inner_report_sender.status_handle = handle.clone();
+        self.trace_receive_reporter.status_handle = handle.clone();
+        self.log_receive_reporter.status_handle = handle.clone();
+        self.meter_receive_reporter.status_handle = handle;
+        self
+    }
+
+    /// Spawn the reporting in background.
+    pub fn spawn(self) -> ReportingJoinHandle {
+        ReportingJoinHandle {
+            handles: try_join_all(vec![
+                tokio::spawn(self.report_sender.start()),
+                tokio::spawn(self.trace_receive_reporter.start()),
+                tokio::spawn(self.log_receive_reporter.start()),
+                tokio::spawn(self.meter_receive_reporter.start()),
+            ]),
+        }
+    }
+}
+
 /// Handle of [Reporting::spawn].
 pub struct ReportingJoinHandle {
-    handle: JoinHandle<crate::Result<()>>,
+    handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
 }
 
 impl Future for ReportingJoinHandle {
     type Output = crate::Result<()>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        Pin::new(&mut self.handle).poll(cx).map(|r| r?)
+        Pin::new(&mut self.handles).poll(cx).map(|rs| {
+            let rs = rs?;
+            for r in rs {
+                r?;
+            }
+            Ok(())
+        })
+    }
+}
+
+struct TraceReceiveReporter {
+    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    trace_receiver: Receiver<SegmentObject>,
+    status_handle: Arc<DynStatusHandler>,
+}
+
+impl TraceReceiveReporter {
+    async fn start(mut self) -> crate::Result<()> {
+        let rf = ReceiveFrom::new(self.trace_receiver);
+        while let Some(stream) = rf.stream() {
+            if let Err(err) = self.trace_client.collect(stream).await {
+                (self.status_handle)("Collect trace segment by stream failed", &err);
+            }
+        }
+        Ok(())
+    }
+}
+
+struct LogReceiveReporter {
+    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    log_receiver: Receiver<LogData>,
+    status_handle: Arc<DynStatusHandler>,
+}
+
+impl LogReceiveReporter {
+    async fn start(mut self) -> crate::Result<()> {
+        let rf = ReceiveFrom::new(self.log_receiver);
+        while let Some(stream) = rf.stream() {
+            if let Err(err) = self.log_client.collect(stream).await {
+                (self.status_handle)("Collect log data by stream failed", &err);
+            }
+        }
+        Ok(())
+    }
+}
+
+struct MeterReceiveReporter {
+    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    meter_receiver: Receiver<MeterData>,
+    status_handle: Arc<DynStatusHandler>,
+}
+
+impl MeterReceiveReporter {
+    async fn start(mut self) -> crate::Result<()> {
+        let rf = ReceiveFrom::new(self.meter_receiver);
+        while let Some(stream) = rf.stream() {
+            if let Err(err) = self.meter_client.collect(stream).await {
+                (self.status_handle)("Collect meter data by stream failed", &err);
+            }
+        }
+        Ok(())
+    }
+}
+
+struct ReceiveFrom<I> {
+    receiver: Arc<Mutex<Receiver<I>>>,
+    is_closed: Arc<AtomicBool>,
+}
+
+impl<I> ReceiveFrom<I> {
+    fn new(receiver: Receiver<I>) -> Self {
+        Self {
+            receiver: Arc::new(Mutex::new(receiver)),
+            is_closed: Default::default(),
+        }
+    }
+
+    fn stream(&self) -> Option<impl Stream<Item = I>> {
+        if self.is_closed.load(Ordering::Relaxed) {
+            return None;
+        }
+
+        let is_closed = self.is_closed.clone();
+        let receiver = self.receiver.clone();
+
+        Some(
+            ReceiveFromStream {
+                receiver,
+                is_closed,
+            }
+            .timeout(Duration::from_secs(30))
+            .map_while(|item| item.ok()),
+        )
+    }
+}
+
+struct ReceiveFromStream<I> {
+    receiver: Arc<Mutex<Receiver<I>>>,
+    is_closed: Arc<AtomicBool>,
+}
+
+impl<I> Stream for ReceiveFromStream<I> {
+    type Item = I;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
+            if item.is_none() {
+                self.is_closed.store(true, Ordering::Relaxed);
+            }
+            item
+        })
     }
 }