You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/02/15 05:38:28 UTC
[skywalking-rust] branch master updated: Use stream and completed for a bulk to collect for grpc reporter. (#54)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new c39722c Use stream and completed for a bulk to collect for grpc reporter. (#54)
c39722c is described below
commit c39722c870bb79ff159ce441d61b6d564a71c0d0
Author: jmjoy <jm...@apache.org>
AuthorDate: Wed Feb 15 13:38:22 2023 +0800
Use stream and completed for a bulk to collect for grpc reporter. (#54)
---
Cargo.toml | 2 +-
examples/simple_log_report.rs | 2 +-
src/reporter/grpc.rs | 405 ++++++++++++++++++++++++++++--------------
3 files changed, 277 insertions(+), 132 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5b66f22..423e8cf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,7 +40,6 @@ vendored = ["protobuf-src"]
mock = [] # For internal integration testing only, do not use.
[dependencies]
-async-stream = "0.3.3"
base64 = "0.13.0"
bytes = "1.2.1"
cfg-if = "1.0.0"
@@ -56,6 +55,7 @@ serde = { version = "1.0.143", features = ["derive"] }
systemstat = { version = "0.2.0", optional = true }
thiserror = "1.0.32"
tokio = { version = "1.20.1", features = ["parking_lot"] }
+tokio-stream = { version = "0.1.9", features = ["time"] }
tonic = { version = "0.8.0", features = ["codegen"] }
tracing = "0.1.36"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
diff --git a/examples/simple_log_report.rs b/examples/simple_log_report.rs
index 0466e9a..02763e9 100644
--- a/examples/simple_log_report.rs
+++ b/examples/simple_log_report.rs
@@ -37,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.reporting()
.await
.with_graceful_shutdown(future::ready(()))
- .start()
+ .spawn()
.await?;
Ok(())
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 68307ee..576adb7 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -27,25 +27,29 @@ use crate::{
SegmentObject,
},
};
-use futures_util::stream;
+use futures_core::Stream;
+use futures_util::future::{try_join_all, TryJoinAll};
use std::{
- collections::LinkedList,
error::Error,
future::{pending, Future},
- mem::take,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
+ time::Duration,
};
use tokio::{
select,
- sync::{mpsc, Mutex},
+ sync::{
+ mpsc::{self, Receiver, Sender},
+ Mutex,
+ },
task::JoinHandle,
try_join,
};
+use tokio_stream::StreamExt;
use tonic::{
async_trait,
metadata::{Ascii, MetadataValue},
@@ -53,6 +57,19 @@ use tonic::{
transport::{self, Channel, Endpoint},
Request, Status,
};
+use tracing::error;
+
+type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
+type DynErrHandler = dyn Fn(&str, &dyn Error) + Send + Sync + 'static;
+type DynStatusHandler = dyn Fn(&str, &Status) + Send + Sync + 'static;
+
+fn default_err_handle(message: &str, err: &dyn Error) {
+ error!(?err, "{}", message);
+}
+
+fn default_status_handle(message: &str, status: &Status) {
+ error!(?status, "{}", message);
+}
/// Special purpose, used for user-defined production operations. Generally, it
/// does not need to be handled.
@@ -114,8 +131,6 @@ impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
}
}
-type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
-
#[derive(Default, Clone)]
struct CustomInterceptor {
authentication: Option<Arc<String>>,
@@ -138,20 +153,23 @@ impl Interceptor for CustomInterceptor {
}
}
-struct Inner<P, C> {
- producer: P,
- consumer: Mutex<Option<C>>,
+struct State {
is_reporting: AtomicBool,
- is_closed: AtomicBool,
+ is_closing: AtomicBool,
}
-/// Alias of dyn [Error] callback.
-pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
+impl State {
+ fn is_closing(&self) -> bool {
+ self.is_closing.load(Ordering::Relaxed)
+ }
+}
/// Reporter which will report to Skywalking OAP server via grpc protocol.
pub struct GrpcReporter<P, C> {
- inner: Arc<Inner<P, C>>,
- err_handle: Arc<Option<Box<DynErrHandle>>>,
+ state: Arc<State>,
+ producer: Arc<P>,
+ consumer: Arc<Mutex<Option<C>>>,
+ err_handle: Arc<DynErrHandler>,
channel: Channel,
interceptor: CustomInterceptor,
}
@@ -178,13 +196,13 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
/// usually you can use [GrpcReporter::connect] and [GrpcReporter::new].
pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
Self {
- inner: Arc::new(Inner {
- producer,
- consumer: Mutex::new(Some(consumer)),
+ state: Arc::new(State {
is_reporting: Default::default(),
- is_closed: Default::default(),
+ is_closing: Default::default(),
}),
- err_handle: Default::default(),
+ producer: Arc::new(producer),
+ consumer: Arc::new(Mutex::new(Some(consumer))),
+ err_handle: Arc::new(default_err_handle),
channel,
interceptor: Default::default(),
}
@@ -193,9 +211,9 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
/// Set error handle. By default, the error will not be handle.
pub fn with_err_handle(
mut self,
- handle: impl Fn(Box<dyn Error>) + Send + Sync + 'static,
+ handle: impl Fn(&str, &dyn Error) + Send + Sync + 'static,
) -> Self {
- self.err_handle = Arc::new(Some(Box::new(handle)));
+ self.err_handle = Arc::new(handle);
self
}
@@ -220,40 +238,63 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
/// # Panics
///
/// Panic if call more than once.
- pub async fn reporting(&self) -> Reporting<P, C> {
- if self.inner.is_reporting.swap(true, Ordering::Relaxed) {
+ pub async fn reporting(&self) -> Reporting<C> {
+ if self.state.is_reporting.swap(true, Ordering::Relaxed) {
panic!("reporting already called");
}
- Reporting {
- rb: ReporterAndBuffer {
- inner: Arc::clone(&self.inner),
- status_handle: None,
+ let (trace_sender, trace_receiver) = mpsc::channel(255);
+ let (log_sender, log_receiver) = mpsc::channel(255);
+ let (meter_sender, meter_receiver) = mpsc::channel(255);
- trace_buffer: Default::default(),
- log_buffer: Default::default(),
- meter_buffer: Default::default(),
+ let status_handle = Arc::new(default_status_handle);
+ Reporting {
+ report_sender: ReportSender {
+ state: Arc::clone(&self.state),
+ inner_report_sender: InnerReportSender {
+ status_handle: Arc::new(default_status_handle),
+ err_handle: self.err_handle.clone(),
+ trace_sender,
+ log_sender,
+ meter_sender,
+
+ #[cfg(feature = "management")]
+ management_client: ManagementServiceClient::with_interceptor(
+ self.channel.clone(),
+ self.interceptor.clone(),
+ ),
+ },
+ shutdown_signal: Box::pin(pending()),
+ consumer: self.consumer.lock().await.take().unwrap(),
+ },
+
+ trace_receive_reporter: TraceReceiveReporter {
trace_client: TraceSegmentReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
+ trace_receiver,
+ status_handle: status_handle.clone(),
+ },
+
+ log_receive_reporter: LogReceiveReporter {
log_client: LogReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
+ log_receiver,
+ status_handle: status_handle.clone(),
+ },
+
+ meter_receive_reporter: MeterReceiveReporter {
meter_client: MeterReportServiceClient::with_interceptor(
self.channel.clone(),
self.interceptor.clone(),
),
- #[cfg(feature = "management")]
- management_client: ManagementServiceClient::with_interceptor(
- self.channel.clone(),
- self.interceptor.clone(),
- ),
+ meter_receiver,
+ status_handle,
},
- shutdown_signal: Box::pin(pending()),
- consumer: self.inner.consumer.lock().await.take().unwrap(),
}
}
}
@@ -261,7 +302,9 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
impl<P, C> Clone for GrpcReporter<P, C> {
fn clone(&self) -> Self {
Self {
- inner: self.inner.clone(),
+ state: self.state.clone(),
+ producer: self.producer.clone(),
+ consumer: self.consumer.clone(),
err_handle: self.err_handle.clone(),
channel: self.channel.clone(),
interceptor: self.interceptor.clone(),
@@ -271,44 +314,43 @@ impl<P, C> Clone for GrpcReporter<P, C> {
impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C> {
fn report(&self, item: CollectItem) {
- if !self.inner.is_closed.load(Ordering::Relaxed) {
- if let Err(e) = self.inner.producer.produce(item) {
- if let Some(handle) = self.err_handle.as_deref() {
- handle(e);
- }
+ if !self.state.is_closing() {
+ if let Err(e) = self.producer.produce(item) {
+ (self.err_handle)("report collect item failed", &*e);
}
}
}
}
-struct ReporterAndBuffer<P, C> {
- inner: Arc<Inner<P, C>>,
- status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+struct InnerReportSender {
+ status_handle: Arc<DynStatusHandler>,
+ err_handle: Arc<DynErrHandler>,
- trace_buffer: LinkedList<SegmentObject>,
- log_buffer: LinkedList<LogData>,
- meter_buffer: LinkedList<MeterData>,
+ trace_sender: Sender<SegmentObject>,
+ log_sender: Sender<LogData>,
+ meter_sender: Sender<MeterData>,
- trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
- log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
- meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
#[cfg(feature = "management")]
- #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}
-impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
+impl InnerReportSender {
async fn report(&mut self, item: CollectItem) {
- // TODO Implement batch collect in future.
match item {
CollectItem::Trace(item) => {
- self.trace_buffer.push_back(*item);
+ if let Err(e) = self.trace_sender.try_send(*item) {
+ (self.err_handle)("report trace segment failed", &e as &dyn Error);
+ }
}
CollectItem::Log(item) => {
- self.log_buffer.push_back(*item);
+ if let Err(e) = self.log_sender.try_send(*item) {
+ (self.err_handle)("report log data failed", &e as &dyn Error);
+ }
}
CollectItem::Meter(item) => {
- self.meter_buffer.push_back(*item);
+ if let Err(e) = self.meter_sender.try_send(*item) {
+ (self.err_handle)("report meter data failed", &e as &dyn Error);
+ }
}
#[cfg(feature = "management")]
CollectItem::Instance(item) => {
@@ -317,97 +359,44 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
.report_instance_properties(*item)
.await
{
- if let Some(status_handle) = &self.status_handle {
- status_handle(e);
- }
+ (self.status_handle)("Report instance properties failed", &e);
}
}
#[cfg(feature = "management")]
CollectItem::Ping(item) => {
if let Err(e) = self.management_client.keep_alive(*item).await {
- if let Some(status_handle) = &self.status_handle {
- status_handle(e);
- }
- }
- }
- }
-
- if !self.trace_buffer.is_empty() {
- let buffer = take(&mut self.trace_buffer);
- if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
- if let Some(status_handle) = &self.status_handle {
- status_handle(e);
- }
- }
- }
- if !self.log_buffer.is_empty() {
- let buffer = take(&mut self.log_buffer);
- if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
- if let Some(status_handle) = &self.status_handle {
- status_handle(e);
- }
- }
- }
-
- if !self.meter_buffer.is_empty() {
- let buffer = take(&mut self.meter_buffer);
- if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
- if let Some(status_handle) = &self.status_handle {
- status_handle(e);
+ (self.status_handle)("Ping failed", &e);
}
}
}
}
}
-/// Handle of [GrpcReporter::reporting].
-pub struct Reporting<P, C> {
- rb: ReporterAndBuffer<P, C>,
+struct ReportSender<C> {
+ state: Arc<State>,
+ inner_report_sender: InnerReportSender,
consumer: C,
shutdown_signal: Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>,
}
-impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
- /// Quit when shutdown_signal received.
- ///
- /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
- pub fn with_graceful_shutdown(
- mut self,
- shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
- ) -> Self {
- self.shutdown_signal = Box::pin(shutdown_signal);
- self
- }
-
- /// Set the failed status handle. By default, the status will not be handle.
- pub fn with_status_handle(mut self, handle: impl Fn(tonic::Status) + Send + 'static) -> Self {
- self.rb.status_handle = Some(Box::new(handle));
- self
- }
-
- /// Spawn the reporting in background.
- pub fn spawn(self) -> ReportingJoinHandle {
- ReportingJoinHandle {
- handle: tokio::spawn(self.start()),
- }
- }
-
- /// Start the consume and report task.
- pub async fn start(self) -> crate::Result<()> {
+impl<C: CollectItemConsume> ReportSender<C> {
+ async fn start(self) -> crate::Result<()> {
let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
- let Reporting {
- mut rb,
- mut consumer,
+ let ReportSender {
+ state,
+ mut inner_report_sender,
+ consumer: mut collect_item_consumer,
shutdown_signal,
+ ..
} = self;
let work_fut = async move {
loop {
select! {
- item = consumer.consume() => {
+ item = collect_item_consumer.consume() => {
match item {
Ok(Some(item)) => {
- rb.report(item).await;
+ inner_report_sender.report(item).await;
}
Ok(None) => break,
Err(err) => return Err(crate::Error::Other(err)),
@@ -417,13 +406,13 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
}
}
- rb.inner.is_closed.store(true, Ordering::Relaxed);
+ state.is_closing.store(true, Ordering::Relaxed);
// Flush.
loop {
- match consumer.try_consume().await {
+ match collect_item_consumer.try_consume().await {
Ok(Some(item)) => {
- rb.report(item).await;
+ inner_report_sender.report(item).await;
}
Ok(None) => break,
Err(err) => return Err(err.into()),
@@ -447,15 +436,171 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Reporting<P, C> {
}
}
+/// Handle of [GrpcReporter::reporting].
+pub struct Reporting<C> {
+ report_sender: ReportSender<C>,
+ trace_receive_reporter: TraceReceiveReporter,
+ log_receive_reporter: LogReceiveReporter,
+ meter_receive_reporter: MeterReceiveReporter,
+}
+
+impl<C: CollectItemConsume> Reporting<C> {
+ /// Quit when shutdown_signal received.
+ ///
+ /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+ pub fn with_graceful_shutdown(
+ mut self,
+ shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+ ) -> Self {
+ self.report_sender.shutdown_signal = Box::pin(shutdown_signal);
+ self
+ }
+
+ /// Set the failed status handle. By default, the status will not be handle.
+ pub fn with_status_handle(
+ mut self,
+ handle: impl Fn(&str, &Status) + Send + Sync + 'static,
+ ) -> Self {
+ let handle = Arc::new(handle);
+ self.report_sender.inner_report_sender.status_handle = handle.clone();
+ self.trace_receive_reporter.status_handle = handle.clone();
+ self.log_receive_reporter.status_handle = handle.clone();
+ self.meter_receive_reporter.status_handle = handle;
+ self
+ }
+
+ /// Spawn the reporting in background.
+ pub fn spawn(self) -> ReportingJoinHandle {
+ ReportingJoinHandle {
+ handles: try_join_all(vec![
+ tokio::spawn(self.report_sender.start()),
+ tokio::spawn(self.trace_receive_reporter.start()),
+ tokio::spawn(self.log_receive_reporter.start()),
+ tokio::spawn(self.meter_receive_reporter.start()),
+ ]),
+ }
+ }
+}
+
/// Handle of [Reporting::spawn].
pub struct ReportingJoinHandle {
- handle: JoinHandle<crate::Result<()>>,
+ handles: TryJoinAll<JoinHandle<crate::Result<()>>>,
}
impl Future for ReportingJoinHandle {
type Output = crate::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- Pin::new(&mut self.handle).poll(cx).map(|r| r?)
+ Pin::new(&mut self.handles).poll(cx).map(|rs| {
+ let rs = rs?;
+ for r in rs {
+ r?;
+ }
+ Ok(())
+ })
+ }
+}
+
+struct TraceReceiveReporter {
+ trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ trace_receiver: Receiver<SegmentObject>,
+ status_handle: Arc<DynStatusHandler>,
+}
+
+impl TraceReceiveReporter {
+ async fn start(mut self) -> crate::Result<()> {
+ let rf = ReceiveFrom::new(self.trace_receiver);
+ while let Some(stream) = rf.stream() {
+ if let Err(err) = self.trace_client.collect(stream).await {
+ (self.status_handle)("Collect trace segment by stream failed", &err);
+ }
+ }
+ Ok(())
+ }
+}
+
+struct LogReceiveReporter {
+ log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ log_receiver: Receiver<LogData>,
+ status_handle: Arc<DynStatusHandler>,
+}
+
+impl LogReceiveReporter {
+ async fn start(mut self) -> crate::Result<()> {
+ let rf = ReceiveFrom::new(self.log_receiver);
+ while let Some(stream) = rf.stream() {
+ if let Err(err) = self.log_client.collect(stream).await {
+ (self.status_handle)("Collect log data by stream failed", &err);
+ }
+ }
+ Ok(())
+ }
+}
+
+struct MeterReceiveReporter {
+ meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ meter_receiver: Receiver<MeterData>,
+ status_handle: Arc<DynStatusHandler>,
+}
+
+impl MeterReceiveReporter {
+ async fn start(mut self) -> crate::Result<()> {
+ let rf = ReceiveFrom::new(self.meter_receiver);
+ while let Some(stream) = rf.stream() {
+ if let Err(err) = self.meter_client.collect(stream).await {
+ (self.status_handle)("Collect meter data by stream failed", &err);
+ }
+ }
+ Ok(())
+ }
+}
+
+struct ReceiveFrom<I> {
+ receiver: Arc<Mutex<Receiver<I>>>,
+ is_closed: Arc<AtomicBool>,
+}
+
+impl<I> ReceiveFrom<I> {
+ fn new(receiver: Receiver<I>) -> Self {
+ Self {
+ receiver: Arc::new(Mutex::new(receiver)),
+ is_closed: Default::default(),
+ }
+ }
+
+ fn stream(&self) -> Option<impl Stream<Item = I>> {
+ if self.is_closed.load(Ordering::Relaxed) {
+ return None;
+ }
+
+ let is_closed = self.is_closed.clone();
+ let receiver = self.receiver.clone();
+
+ Some(
+ ReceiveFromStream {
+ receiver,
+ is_closed,
+ }
+ .timeout(Duration::from_secs(30))
+ .map_while(|item| item.ok()),
+ )
+ }
+}
+
+struct ReceiveFromStream<I> {
+ receiver: Arc<Mutex<Receiver<I>>>,
+ is_closed: Arc<AtomicBool>,
+}
+
+impl<I> Stream for ReceiveFromStream<I> {
+ type Item = I;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.receiver.try_lock().unwrap().poll_recv(cx).map(|item| {
+ if item.is_none() {
+ self.is_closed.store(true, Ordering::Relaxed);
+ }
+ item
+ })
}
}