You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/01/03 03:57:45 UTC
[skywalking-rust] branch master updated: Add authentication and custom intercept support. (#50)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new cbf1079 Add authentication and custom intercept support. (#50)
cbf1079 is described below
commit cbf107909a9e1a7611b9513360c35a08b0c39ada
Author: jmjoy <jm...@apache.org>
AuthorDate: Tue Jan 3 11:57:40 2023 +0800
Add authentication and custom intercept support. (#50)
---
README.md | 2 +
src/reporter/grpc.rs | 126 ++++++++++++++++++++++++++---------------
src/reporter/print.rs | 1 +
src/skywalking_proto/v3/mod.rs | 1 +
src/trace/trace_context.rs | 8 +--
5 files changed, 88 insertions(+), 50 deletions(-)
diff --git a/README.md b/README.md
index af00725..3ebf868 100644
--- a/README.md
+++ b/README.md
@@ -144,6 +144,8 @@ async fn handle_metric(mut metricer: Metricer) {
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+ // Optional authentication, based on backend setting.
+ let reporter = reporter.with_authentication("<TOKEN>");
// Spawn the reporting in background, with listening the graceful shutdown signal.
let handle = reporter
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 2ac1f00..68307ee 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -48,7 +48,10 @@ use tokio::{
};
use tonic::{
async_trait,
+ metadata::{Ascii, MetadataValue},
+ service::{interceptor::InterceptedService, Interceptor},
transport::{self, Channel, Endpoint},
+ Request, Status,
};
/// Special purpose, used for user-defined production operations. Generally, it
@@ -111,13 +114,31 @@ impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
}
}
+type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
+
+#[derive(Default, Clone)]
+struct CustomInterceptor {
+ authentication: Option<Arc<String>>,
+ custom_intercept: Option<Arc<DynInterceptHandler>>,
+}
+
+impl Interceptor for CustomInterceptor {
+ fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+ if let Some(authentication) = &self.authentication {
+ if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
+ request
+ .metadata_mut()
+ .insert("authentication", authentication);
+ }
+ }
+ if let Some(custom_intercept) = &self.custom_intercept {
+ request = custom_intercept(request)?;
+ }
+ Ok(request)
+ }
+}
+
struct Inner<P, C> {
- trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
- log_client: Mutex<LogReportServiceClient<Channel>>,
- meter_client: Mutex<MeterReportServiceClient<Channel>>,
- #[cfg(feature = "management")]
- #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
- management_client: Mutex<ManagementServiceClient<Channel>>,
producer: P,
consumer: Mutex<Option<C>>,
is_reporting: AtomicBool,
@@ -131,6 +152,8 @@ pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
pub struct GrpcReporter<P, C> {
inner: Arc<Inner<P, C>>,
err_handle: Arc<Option<Box<DynErrHandle>>>,
+ channel: Channel,
+ interceptor: CustomInterceptor,
}
impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
@@ -156,17 +179,14 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
Self {
inner: Arc::new(Inner {
- trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
- log_client: Mutex::new(LogReportServiceClient::new(channel.clone())),
- #[cfg(feature = "management")]
- management_client: Mutex::new(ManagementServiceClient::new(channel.clone())),
- meter_client: Mutex::new(MeterReportServiceClient::new(channel)),
producer,
consumer: Mutex::new(Some(consumer)),
is_reporting: Default::default(),
is_closed: Default::default(),
}),
err_handle: Default::default(),
+ channel,
+ interceptor: Default::default(),
}
}
@@ -179,6 +199,22 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
self
}
+ /// Set the authentication header value. By default, the authentication is
+ /// not set.
+ pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
+ self.interceptor.authentication = Some(Arc::new(authentication.into()));
+ self
+ }
+
+ /// Set the custom intercept. By default, the custom intercept is not set.
+ pub fn with_custom_intercept(
+ mut self,
+ custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
+ ) -> Self {
+ self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
+ self
+ }
+
/// Start to reporting.
///
/// # Panics
@@ -193,9 +229,28 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
rb: ReporterAndBuffer {
inner: Arc::clone(&self.inner),
status_handle: None,
+
trace_buffer: Default::default(),
log_buffer: Default::default(),
meter_buffer: Default::default(),
+
+ trace_client: TraceSegmentReportServiceClient::with_interceptor(
+ self.channel.clone(),
+ self.interceptor.clone(),
+ ),
+ log_client: LogReportServiceClient::with_interceptor(
+ self.channel.clone(),
+ self.interceptor.clone(),
+ ),
+ meter_client: MeterReportServiceClient::with_interceptor(
+ self.channel.clone(),
+ self.interceptor.clone(),
+ ),
+ #[cfg(feature = "management")]
+ management_client: ManagementServiceClient::with_interceptor(
+ self.channel.clone(),
+ self.interceptor.clone(),
+ ),
},
shutdown_signal: Box::pin(pending()),
consumer: self.inner.consumer.lock().await.take().unwrap(),
@@ -208,6 +263,8 @@ impl<P, C> Clone for GrpcReporter<P, C> {
Self {
inner: self.inner.clone(),
err_handle: self.err_handle.clone(),
+ channel: self.channel.clone(),
+ interceptor: self.interceptor.clone(),
}
}
}
@@ -227,9 +284,17 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C>
struct ReporterAndBuffer<P, C> {
inner: Arc<Inner<P, C>>,
status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+
trace_buffer: LinkedList<SegmentObject>,
log_buffer: LinkedList<LogData>,
meter_buffer: LinkedList<MeterData>,
+
+ trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+ #[cfg(feature = "management")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+ management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
}
impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
@@ -248,10 +313,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
#[cfg(feature = "management")]
CollectItem::Instance(item) => {
if let Err(e) = self
- .inner
.management_client
- .lock()
- .await
.report_instance_properties(*item)
.await
{
@@ -262,14 +324,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
}
#[cfg(feature = "management")]
CollectItem::Ping(item) => {
- if let Err(e) = self
- .inner
- .management_client
- .lock()
- .await
- .keep_alive(*item)
- .await
- {
+ if let Err(e) = self.management_client.keep_alive(*item).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
@@ -279,14 +334,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
if !self.trace_buffer.is_empty() {
let buffer = take(&mut self.trace_buffer);
- if let Err(e) = self
- .inner
- .trace_client
- .lock()
- .await
- .collect(stream::iter(buffer))
- .await
- {
+ if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
@@ -294,14 +342,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
}
if !self.log_buffer.is_empty() {
let buffer = take(&mut self.log_buffer);
- if let Err(e) = self
- .inner
- .log_client
- .lock()
- .await
- .collect(stream::iter(buffer))
- .await
- {
+ if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
@@ -310,14 +351,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
if !self.meter_buffer.is_empty() {
let buffer = take(&mut self.meter_buffer);
- if let Err(e) = self
- .inner
- .meter_client
- .lock()
- .await
- .collect(stream::iter(buffer))
- .await
- {
+ if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
if let Some(status_handle) = &self.status_handle {
status_handle(e);
}
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index 5b3fde0..006327e 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -40,6 +40,7 @@ impl PrintReporter {
}
impl Report for PrintReporter {
+ #[allow(clippy::print_stdout)]
fn report(&self, items: CollectItem) {
match items {
CollectItem::Trace(data) => {
diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs
index 32d517d..0086949 100644
--- a/src/skywalking_proto/v3/mod.rs
+++ b/src/skywalking_proto/v3/mod.rs
@@ -17,6 +17,7 @@
//! Generated code of `skywalking.v3`, by `tonic`.
#![allow(missing_docs)]
+#![allow(rustdoc::invalid_html_tags)]
use crate::common::system_time::{fetch_time, TimePeriod};
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index b5161f4..e9612c2 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -52,15 +52,15 @@ impl SpanStack {
}
pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
- f(&mut *self.finalized.try_write().expect(LOCK_MSG))
+ f(&mut self.finalized.try_write().expect(LOCK_MSG))
}
pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {
- f(&*self.active.try_read().expect(LOCK_MSG))
+ f(&self.active.try_read().expect(LOCK_MSG))
}
pub(crate) fn with_active_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
- f(&mut *self.active.try_write().expect(LOCK_MSG))
+ f(&mut self.active.try_write().expect(LOCK_MSG))
}
fn pop_active(&self, index: usize) -> Option<SpanObject> {
@@ -184,7 +184,7 @@ impl TracingContext {
}
fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
- f(&mut *self.span_stack.finalized.try_write().expect(LOCK_MSG))
+ f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG))
}
pub(crate) fn with_active_span_stack<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {