You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/01/03 03:57:45 UTC

[skywalking-rust] branch master updated: Add authentication and custom intercept support. (#50)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf1079  Add authentication and custom intercept support. (#50)
cbf1079 is described below

commit cbf107909a9e1a7611b9513360c35a08b0c39ada
Author: jmjoy <jm...@apache.org>
AuthorDate: Tue Jan 3 11:57:40 2023 +0800

    Add authentication and custom intercept support. (#50)
---
 README.md                      |   2 +
 src/reporter/grpc.rs           | 126 ++++++++++++++++++++++++++---------------
 src/reporter/print.rs          |   1 +
 src/skywalking_proto/v3/mod.rs |   1 +
 src/trace/trace_context.rs     |   8 +--
 5 files changed, 88 insertions(+), 50 deletions(-)

diff --git a/README.md b/README.md
index af00725..3ebf868 100644
--- a/README.md
+++ b/README.md
@@ -144,6 +144,8 @@ async fn handle_metric(mut metricer: Metricer) {
 async fn main() -> Result<(), Box<dyn Error>> {
     // Connect to skywalking oap server.
     let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+    // Optional authentication, based on backend setting.
+    let reporter = reporter.with_authentication("<TOKEN>");
 
     // Spawn the reporting in background, with listening the graceful shutdown signal.
     let handle = reporter
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index 2ac1f00..68307ee 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -48,7 +48,10 @@ use tokio::{
 };
 use tonic::{
     async_trait,
+    metadata::{Ascii, MetadataValue},
+    service::{interceptor::InterceptedService, Interceptor},
     transport::{self, Channel, Endpoint},
+    Request, Status,
 };
 
 /// Special purpose, used for user-defined production operations. Generally, it
@@ -111,13 +114,31 @@ impl CollectItemConsume for mpsc::UnboundedReceiver<CollectItem> {
     }
 }
 
+type DynInterceptHandler = dyn Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync;
+
+#[derive(Default, Clone)]
+struct CustomInterceptor {
+    authentication: Option<Arc<String>>,
+    custom_intercept: Option<Arc<DynInterceptHandler>>,
+}
+
+impl Interceptor for CustomInterceptor {
+    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
+        if let Some(authentication) = &self.authentication {
+            if let Ok(authentication) = authentication.parse::<MetadataValue<Ascii>>() {
+                request
+                    .metadata_mut()
+                    .insert("authentication", authentication);
+            }
+        }
+        if let Some(custom_intercept) = &self.custom_intercept {
+            request = custom_intercept(request)?;
+        }
+        Ok(request)
+    }
+}
+
 struct Inner<P, C> {
-    trace_client: Mutex<TraceSegmentReportServiceClient<Channel>>,
-    log_client: Mutex<LogReportServiceClient<Channel>>,
-    meter_client: Mutex<MeterReportServiceClient<Channel>>,
-    #[cfg(feature = "management")]
-    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
-    management_client: Mutex<ManagementServiceClient<Channel>>,
     producer: P,
     consumer: Mutex<Option<C>>,
     is_reporting: AtomicBool,
@@ -131,6 +152,8 @@ pub type DynErrHandle = dyn Fn(Box<dyn Error>) + Send + Sync + 'static;
 pub struct GrpcReporter<P, C> {
     inner: Arc<Inner<P, C>>,
     err_handle: Arc<Option<Box<DynErrHandle>>>,
+    channel: Channel,
+    interceptor: CustomInterceptor,
 }
 
 impl GrpcReporter<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
@@ -156,17 +179,14 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
     pub fn new_with_pc(channel: Channel, producer: P, consumer: C) -> Self {
         Self {
             inner: Arc::new(Inner {
-                trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())),
-                log_client: Mutex::new(LogReportServiceClient::new(channel.clone())),
-                #[cfg(feature = "management")]
-                management_client: Mutex::new(ManagementServiceClient::new(channel.clone())),
-                meter_client: Mutex::new(MeterReportServiceClient::new(channel)),
                 producer,
                 consumer: Mutex::new(Some(consumer)),
                 is_reporting: Default::default(),
                 is_closed: Default::default(),
             }),
             err_handle: Default::default(),
+            channel,
+            interceptor: Default::default(),
         }
     }
 
@@ -179,6 +199,22 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
         self
     }
 
+    /// Set the authentication header value. By default, the authentication is
+    /// not set.
+    pub fn with_authentication(mut self, authentication: impl Into<String>) -> Self {
+        self.interceptor.authentication = Some(Arc::new(authentication.into()));
+        self
+    }
+
+    /// Set the custom intercept. By default, the custom intercept is not set.
+    pub fn with_custom_intercept(
+        mut self,
+        custom_intercept: impl Fn(Request<()>) -> Result<Request<()>, Status> + Send + Sync + 'static,
+    ) -> Self {
+        self.interceptor.custom_intercept = Some(Arc::new(custom_intercept));
+        self
+    }
+
     /// Start to reporting.
     ///
     /// # Panics
@@ -193,9 +229,28 @@ impl<P: CollectItemProduce, C: CollectItemConsume> GrpcReporter<P, C> {
             rb: ReporterAndBuffer {
                 inner: Arc::clone(&self.inner),
                 status_handle: None,
+
                 trace_buffer: Default::default(),
                 log_buffer: Default::default(),
                 meter_buffer: Default::default(),
+
+                trace_client: TraceSegmentReportServiceClient::with_interceptor(
+                    self.channel.clone(),
+                    self.interceptor.clone(),
+                ),
+                log_client: LogReportServiceClient::with_interceptor(
+                    self.channel.clone(),
+                    self.interceptor.clone(),
+                ),
+                meter_client: MeterReportServiceClient::with_interceptor(
+                    self.channel.clone(),
+                    self.interceptor.clone(),
+                ),
+                #[cfg(feature = "management")]
+                management_client: ManagementServiceClient::with_interceptor(
+                    self.channel.clone(),
+                    self.interceptor.clone(),
+                ),
             },
             shutdown_signal: Box::pin(pending()),
             consumer: self.inner.consumer.lock().await.take().unwrap(),
@@ -208,6 +263,8 @@ impl<P, C> Clone for GrpcReporter<P, C> {
         Self {
             inner: self.inner.clone(),
             err_handle: self.err_handle.clone(),
+            channel: self.channel.clone(),
+            interceptor: self.interceptor.clone(),
         }
     }
 }
@@ -227,9 +284,17 @@ impl<P: CollectItemProduce, C: CollectItemConsume> Report for GrpcReporter<P, C>
 struct ReporterAndBuffer<P, C> {
     inner: Arc<Inner<P, C>>,
     status_handle: Option<Box<dyn Fn(tonic::Status) + Send + 'static>>,
+
     trace_buffer: LinkedList<SegmentObject>,
     log_buffer: LinkedList<LogData>,
     meter_buffer: LinkedList<MeterData>,
+
+    trace_client: TraceSegmentReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    log_client: LogReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    meter_client: MeterReportServiceClient<InterceptedService<Channel, CustomInterceptor>>,
+    #[cfg(feature = "management")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "management")))]
+    management_client: ManagementServiceClient<InterceptedService<Channel, CustomInterceptor>>,
 }
 
 impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
@@ -248,10 +313,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
             #[cfg(feature = "management")]
             CollectItem::Instance(item) => {
                 if let Err(e) = self
-                    .inner
                     .management_client
-                    .lock()
-                    .await
                     .report_instance_properties(*item)
                     .await
                 {
@@ -262,14 +324,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
             }
             #[cfg(feature = "management")]
             CollectItem::Ping(item) => {
-                if let Err(e) = self
-                    .inner
-                    .management_client
-                    .lock()
-                    .await
-                    .keep_alive(*item)
-                    .await
-                {
+                if let Err(e) = self.management_client.keep_alive(*item).await {
                     if let Some(status_handle) = &self.status_handle {
                         status_handle(e);
                     }
@@ -279,14 +334,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
 
         if !self.trace_buffer.is_empty() {
             let buffer = take(&mut self.trace_buffer);
-            if let Err(e) = self
-                .inner
-                .trace_client
-                .lock()
-                .await
-                .collect(stream::iter(buffer))
-                .await
-            {
+            if let Err(e) = self.trace_client.collect(stream::iter(buffer)).await {
                 if let Some(status_handle) = &self.status_handle {
                     status_handle(e);
                 }
@@ -294,14 +342,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
         }
         if !self.log_buffer.is_empty() {
             let buffer = take(&mut self.log_buffer);
-            if let Err(e) = self
-                .inner
-                .log_client
-                .lock()
-                .await
-                .collect(stream::iter(buffer))
-                .await
-            {
+            if let Err(e) = self.log_client.collect(stream::iter(buffer)).await {
                 if let Some(status_handle) = &self.status_handle {
                     status_handle(e);
                 }
@@ -310,14 +351,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> ReporterAndBuffer<P, C> {
 
         if !self.meter_buffer.is_empty() {
             let buffer = take(&mut self.meter_buffer);
-            if let Err(e) = self
-                .inner
-                .meter_client
-                .lock()
-                .await
-                .collect(stream::iter(buffer))
-                .await
-            {
+            if let Err(e) = self.meter_client.collect(stream::iter(buffer)).await {
                 if let Some(status_handle) = &self.status_handle {
                     status_handle(e);
                 }
diff --git a/src/reporter/print.rs b/src/reporter/print.rs
index 5b3fde0..006327e 100644
--- a/src/reporter/print.rs
+++ b/src/reporter/print.rs
@@ -40,6 +40,7 @@ impl PrintReporter {
 }
 
 impl Report for PrintReporter {
+    #[allow(clippy::print_stdout)]
     fn report(&self, items: CollectItem) {
         match items {
             CollectItem::Trace(data) => {
diff --git a/src/skywalking_proto/v3/mod.rs b/src/skywalking_proto/v3/mod.rs
index 32d517d..0086949 100644
--- a/src/skywalking_proto/v3/mod.rs
+++ b/src/skywalking_proto/v3/mod.rs
@@ -17,6 +17,7 @@
 //! Generated code of `skywalking.v3`, by `tonic`.
 
 #![allow(missing_docs)]
+#![allow(rustdoc::invalid_html_tags)]
 
 use crate::common::system_time::{fetch_time, TimePeriod};
 
diff --git a/src/trace/trace_context.rs b/src/trace/trace_context.rs
index b5161f4..e9612c2 100644
--- a/src/trace/trace_context.rs
+++ b/src/trace/trace_context.rs
@@ -52,15 +52,15 @@ impl SpanStack {
     }
 
     pub(crate) fn with_finalized_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
-        f(&mut *self.finalized.try_write().expect(LOCK_MSG))
+        f(&mut self.finalized.try_write().expect(LOCK_MSG))
     }
 
     pub(crate) fn with_active<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {
-        f(&*self.active.try_read().expect(LOCK_MSG))
+        f(&self.active.try_read().expect(LOCK_MSG))
     }
 
     pub(crate) fn with_active_mut<T>(&self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
-        f(&mut *self.active.try_write().expect(LOCK_MSG))
+        f(&mut self.active.try_write().expect(LOCK_MSG))
     }
 
     fn pop_active(&self, index: usize) -> Option<SpanObject> {
@@ -184,7 +184,7 @@ impl TracingContext {
     }
 
     fn with_spans_mut<T>(&mut self, f: impl FnOnce(&mut Vec<SpanObject>) -> T) -> T {
-        f(&mut *self.span_stack.finalized.try_write().expect(LOCK_MSG))
+        f(&mut self.span_stack.finalized.try_write().expect(LOCK_MSG))
     }
 
     pub(crate) fn with_active_span_stack<T>(&self, f: impl FnOnce(&Vec<SpanObject>) -> T) -> T {