You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/10 14:49:18 UTC
[skywalking-rust] branch master updated: Add tracer. (#26)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git
The following commit(s) were added to refs/heads/master by this push:
new 699247d Add tracer. (#26)
699247d is described below
commit 699247dfbb804e8541dd6174a2c2f238862709ec
Author: jmjoy <91...@qq.com>
AuthorDate: Sun Jul 10 22:49:14 2022 +0800
Add tracer. (#26)
---
.github/workflows/ci.yaml | 7 ++
Cargo.toml | 5 +-
README.md | 42 ++++++++----
e2e/Cargo.lock | 26 +++++--
e2e/src/main.rs | 78 +++++++++++----------
examples/simple_trace_report.rs | 50 +++++++++++---
src/context/mod.rs | 1 +
src/context/tracer.rs | 147 ++++++++++++++++++++++++++++++++++++++++
src/error/mod.rs | 13 +++-
src/lib.rs | 3 +
src/reporter/grpc.rs | 117 +++++++++-----------------------
src/reporter/mod.rs | 12 ++++
12 files changed, 351 insertions(+), 150 deletions(-)
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index cd63125..ecdc8cd 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -25,6 +25,11 @@ on:
tags:
- 'v*'
+env:
+ CARGO_TERM_COLOR: always
+ RUST_BACKTRACE: "1"
+ RUSTFLAGS: "-D warnings"
+
jobs:
CI:
runs-on: ubuntu-latest
@@ -48,3 +53,5 @@ jobs:
run: cargo clippy --workspace --tests -- -D warnings
- name: Run tests
run: cargo test --workspace
+ - name: Run docs
+ run: cargo rustdoc --all-features -- -D warnings
diff --git a/Cargo.toml b/Cargo.toml
index 4cc8d14..aa7b9d7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,11 +30,14 @@ repository = "https://github.com/apache/skywalking-rust"
async-stream = "0.3.3"
base64 = "0.13.0"
bytes = "1.1.0"
+futures-core = "0.3.21"
+futures-util = "0.3.21"
prost = "0.10.4"
prost-derive = "0.10.1"
thiserror = "1.0.31"
tokio = { version = "1.18.2", features = ["full"] }
-tonic = "0.7.2"
+tonic = { version = "0.7.2", features = ["codegen"] }
+tracing = "0.1.35"
uuid = { version = "1.1.0", features = ["serde", "v4"] }
[build-dependencies]
diff --git a/README.md b/README.md
index 079be17..58cec12 100644
--- a/README.md
+++ b/README.md
@@ -33,24 +33,27 @@ context after the span finished.
# Example
-```rust
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
-use tokio;
+```rust, no_run
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
+use std::error::Error;
+use std::sync::Arc;
+use tokio::signal;
+
+async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
+ let mut ctx = tracer.create_trace_context();
-async fn handle_request(reporter: ContextReporter) {
- let mut ctx = TracingContext::default("svc", "ins");
{
// Generate an Entry Span when a request
// is received. An Entry Span is generated only once per context.
- let span = ctx.create_entry_span("operation1").unwrap();
+ let span = ctx.create_entry_span("op1").unwrap();
// Something...
{
// Generates an Exit Span when executing an RPC.
- let span2 = ctx.create_exit_span("operation2").unwrap();
-
+ let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
+
// Something...
ctx.finalize_span(span2);
@@ -58,21 +61,32 @@ async fn handle_request(reporter: ContextReporter) {
ctx.finalize_span(span);
}
- reporter.send(context).await;
+
+ tracer.finalize_context(ctx);
}
#[tokio::main]
-async fn main() {
- let tx = Reporter::start("http://0.0.0.0:11800").await;
+async fn main() -> Result<(), Box<dyn Error>> {
+ let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+ let tracer = Arc::new(Tracer::new("service", "instance", reporter));
+
+ tokio::spawn(handle_request(tracer.clone()));
- // Start server...
+ // Start to report.
+ let handle = tracer.reporting(async move {
+ let _ = signal::ctrl_c().await;
+ });
+
+ handle.await?;
+
+ Ok(())
}
```
# How to compile?
If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:
-```
+```shell
tar -xvzf skywalking-(VERSION).crate
```
diff --git a/e2e/Cargo.lock b/e2e/Cargo.lock
index 1023a8b..e6c6232 100644
--- a/e2e/Cargo.lock
+++ b/e2e/Cargo.lock
@@ -231,6 +231,17 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "futures-sink"
version = "0.3.21"
@@ -250,9 +261,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-core",
+ "futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
+ "slab",
]
[[package]]
@@ -783,12 +796,15 @@ dependencies = [
"async-stream",
"base64",
"bytes",
+ "futures-core",
+ "futures-util",
"prost",
"prost-derive",
"thiserror",
"tokio",
"tonic",
"tonic-build",
+ "tracing",
"uuid",
]
@@ -1068,9 +1084,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
-version = "0.1.34"
+version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
+checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if",
"log",
@@ -1092,11 +1108,11 @@ dependencies = [
[[package]]
name = "tracing-core"
-version = "0.1.26"
+version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
+checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7"
dependencies = [
- "lazy_static",
+ "once_cell",
]
[[package]]
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index a0b500b..b6f7b19 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -21,23 +21,35 @@ use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
use skywalking::context::propagation::context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY;
use skywalking::context::propagation::decoder::decode_propagation;
use skywalking::context::propagation::encoder::encode_propagation;
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
use std::convert::Infallible;
use std::error::Error;
+use std::future::pending;
use std::net::SocketAddr;
use structopt::StructOpt;
-use tokio::sync::mpsc;
+use tokio::sync::OnceCell;
static NOT_FOUND_MSG: &str = "not found";
static SUCCESS_MSG: &str = "Success";
+static GLOBAL_TRACER: OnceCell<Tracer<GrpcReporter>> = OnceCell::const_new();
+
+fn set_global_tracer(tracer: Tracer<GrpcReporter>) {
+ if GLOBAL_TRACER.set(tracer).is_err() {
+ panic!("TRACER has setted")
+ }
+}
+
+fn get_global_tracer() -> &'static Tracer<GrpcReporter> {
+ GLOBAL_TRACER.get().expect("TRACER haven't setted")
+}
+
async fn handle_ping(
_req: Request<Body>,
client: Client<HttpConnector>,
- tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
- let mut context = TracingContext::default("producer", "node_0");
+ let mut context = get_global_tracer().create_trace_context();
let span = context.create_entry_span("/ping").unwrap();
{
let span2 = context.create_exit_span("/pong", "consumer:8082").unwrap();
@@ -53,17 +65,16 @@ async fn handle_ping(
context.finalize_span(span2);
}
context.finalize_span(span);
- let _ = tx.send(context).await;
+ get_global_tracer().finalize_context(context);
Ok(Response::new(Body::from("hoge")))
}
async fn producer_response(
_req: Request<Body>,
client: Client<HttpConnector>,
- tx: mpsc::Sender<TracingContext>,
) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
- (&Method::GET, "/ping") => handle_ping(_req, client, tx).await,
+ (&Method::GET, "/ping") => handle_ping(_req, client).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
@@ -75,15 +86,14 @@ async fn producer_response(
}
}
-async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
+async fn run_producer_service(host: [u8; 4]) {
let client = Client::new();
let make_svc = make_service_fn(|_| {
- let tx = tx.clone();
let client = client.clone();
async {
Ok::<_, Infallible>(service_fn(move |req| {
- producer_response(req, client.to_owned(), tx.to_owned())
+ producer_response(req, client.to_owned())
}))
}
});
@@ -95,29 +105,23 @@ async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
}
}
-async fn handle_pong(
- _req: Request<Body>,
- tx: mpsc::Sender<TracingContext>,
-) -> Result<Response<Body>, Infallible> {
+async fn handle_pong(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let ctx = decode_propagation(
_req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY]
.to_str()
.unwrap(),
)
.unwrap();
- let mut context = TracingContext::from_propagation_context("consumer", "node_0", ctx);
+ let mut context = get_global_tracer().create_trace_context_from_propagation(ctx);
let span = context.create_entry_span("/pong").unwrap();
context.finalize_span(span);
- let _ = tx.send(context).await;
+ get_global_tracer().finalize_context(context);
Ok(Response::new(Body::from("hoge")))
}
-async fn consumer_response(
- _req: Request<Body>,
- tx: mpsc::Sender<TracingContext>,
-) -> Result<Response<Body>, Infallible> {
+async fn consumer_response(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
match (_req.method(), _req.uri().path()) {
- (&Method::GET, "/pong") => handle_pong(_req, tx).await,
+ (&Method::GET, "/pong") => handle_pong(_req).await,
(&Method::GET, "/healthCheck") => Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::from(SUCCESS_MSG))
@@ -129,11 +133,9 @@ async fn consumer_response(
}
}
-async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
- let make_svc = make_service_fn(|_| {
- let tx = tx.clone();
- async { Ok::<_, Infallible>(service_fn(move |req| consumer_response(req, tx.to_owned()))) }
- });
+async fn run_consumer_service(host: [u8; 4]) {
+ let make_svc =
+ make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(consumer_response)) });
let addr = SocketAddr::from((host, 8082));
let server = Server::bind(&addr).serve(make_svc);
@@ -153,15 +155,23 @@ struct Opt {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let opt = Opt::from_args();
- let reporter = Reporter::start("http://collector:19876").await?;
- let tx = reporter.sender();
+ let reporter = GrpcReporter::connect("http://collector:19876").await?;
- if opt.mode == "consumer" {
- run_consumer_service([0, 0, 0, 0], tx).await;
+ let handle = if opt.mode == "consumer" {
+ set_global_tracer(Tracer::new("consumer", "node_0", reporter));
+ let handle = get_global_tracer().reporting(pending());
+ run_consumer_service([0, 0, 0, 0]).await;
+ handle
} else if opt.mode == "producer" {
- run_producer_service([0, 0, 0, 0], tx).await;
- }
+ set_global_tracer(Tracer::new("producer", "node_0", reporter));
+ let handle = get_global_tracer().reporting(pending());
+ run_producer_service([0, 0, 0, 0]).await;
+ handle
+ } else {
+ unreachable!()
+ };
+
+ handle.await?;
- reporter.shutdown().await?;
Ok(())
}
diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs
index 97dc9c6..6d8d7fc 100644
--- a/examples/simple_trace_report.rs
+++ b/examples/simple_trace_report.rs
@@ -15,20 +15,50 @@
// specific language governing permissions and limitations
// under the License.
//
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
use std::error::Error;
+use std::sync::Arc;
+use tokio::signal;
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
+async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
+ let mut ctx = tracer.create_trace_context();
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn Error>> {
- let reporter = Reporter::start("http://0.0.0.0:11800").await?;
- let mut context = TracingContext::default("service", "instance");
{
- let span = context.create_entry_span("op1").unwrap();
- context.finalize_span(span);
+ // Generate an Entry Span when a request
+ // is received. An Entry Span is generated only once per context.
+ let span = ctx.create_entry_span("op1").unwrap();
+
+ // Something...
+
+ {
+ // Generates an Exit Span when executing an RPC.
+ let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
+
+ // Something...
+
+ ctx.finalize_span(span2);
+ }
+
+ ctx.finalize_span(span);
}
- reporter.sender().send(context).await?;
- reporter.shutdown().await?;
+
+ tracer.finalize_context(ctx);
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+ let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+ let tracer = Arc::new(Tracer::new("service", "instance", reporter));
+
+ tokio::spawn(handle_request(tracer.clone()));
+
+ // Start to report.
+ let handle = tracer.reporting(async move {
+ let _ = signal::ctrl_c().await;
+ });
+
+ handle.await?;
+
Ok(())
}
diff --git a/src/context/mod.rs b/src/context/mod.rs
index 3e9e727..99ddb08 100644
--- a/src/context/mod.rs
+++ b/src/context/mod.rs
@@ -17,3 +17,4 @@
pub mod propagation;
pub mod system_time;
pub mod trace_context;
+pub mod tracer;
diff --git a/src/context/tracer.rs b/src/context/tracer.rs
new file mode 100644
index 0000000..7ca4bd1
--- /dev/null
+++ b/src/context/tracer.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::propagation::context::PropagationContext;
+use crate::{
+ context::trace_context::TracingContext, reporter::Reporter, skywalking_proto::v3::SegmentObject,
+};
+use futures_util::stream;
+use std::future::Future;
+use std::{collections::LinkedList, sync::Arc};
+use tokio::{
+ sync::{
+ mpsc::{self, UnboundedReceiver},
+ Mutex,
+ },
+ task::JoinHandle,
+};
+
+/// Skywalking tracer.
+pub struct Tracer<R: Reporter + Send + Sync + 'static> {
+ service_name: String,
+ instance_name: String,
+ reporter: Arc<Mutex<R>>,
+ segment_sender: mpsc::UnboundedSender<SegmentObject>,
+ segment_receiver: Arc<Mutex<mpsc::UnboundedReceiver<SegmentObject>>>,
+}
+
+impl<R: Reporter + Send + Sync + 'static> Tracer<R> {
+ /// New with service info and reporter.
+ pub fn new(service_name: impl ToString, instance_name: impl ToString, reporter: R) -> Self {
+ let (segment_sender, segment_receiver) = mpsc::unbounded_channel();
+
+ Self {
+ service_name: service_name.to_string(),
+ instance_name: instance_name.to_string(),
+ reporter: Arc::new(Mutex::new(reporter)),
+ segment_sender,
+ segment_receiver: Arc::new(Mutex::new(segment_receiver)),
+ }
+ }
+
+ /// Create trace conetxt.
+ pub fn create_trace_context(&self) -> TracingContext {
+ TracingContext::default(&self.service_name, &self.instance_name)
+ }
+
+ /// Create trace conetxt from propagation.
+ pub fn create_trace_context_from_propagation(
+ &self,
+ context: PropagationContext,
+ ) -> TracingContext {
+ TracingContext::from_propagation_context(&self.service_name, &self.instance_name, context)
+ }
+
+ /// Finalize the trace context.
+ pub fn finalize_context(&self, context: TracingContext) {
+ let segment_object = context.convert_segment_object();
+ if self.segment_sender.send(segment_object).is_err() {
+ tracing::warn!("segment object channel has closed");
+ }
+ }
+
+ /// Start to reporting, quit when shutdown_signal received.
+ ///
+ /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+ pub fn reporting(
+ &self,
+ shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+ ) -> JoinHandle<()> {
+ let reporter = self.reporter.clone();
+ let segment_receiver = self.segment_receiver.clone();
+ tokio::spawn(Self::do_reporting(
+ reporter,
+ segment_receiver,
+ shutdown_signal,
+ ))
+ }
+
+ async fn do_reporting(
+ reporter: Arc<Mutex<R>>,
+ segment_receiver: Arc<Mutex<UnboundedReceiver<SegmentObject>>>,
+ shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+ ) {
+ let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
+
+ let handle = tokio::spawn(async move {
+ loop {
+ let mut segment_receiver = segment_receiver.lock().await;
+ let mut segments = LinkedList::new();
+
+ tokio::select! {
+ segment = segment_receiver.recv() => {
+ drop(segment_receiver);
+
+ if let Some(segment) = segment {
+ // TODO Implement batch collect in future.
+ segments.push_back(segment);
+ let mut reporter = reporter.lock().await;
+ Self::report_segment_object(&mut reporter, segments).await;
+ } else {
+ break;
+ }
+ }
+ _ = shutdown_rx.recv() => break,
+ }
+ }
+
+ // Flush.
+ let mut segment_receiver = segment_receiver.lock().await;
+ let mut segments = LinkedList::new();
+ while let Ok(segment) = segment_receiver.try_recv() {
+ segments.push_back(segment);
+ }
+ let mut reporter = reporter.lock().await;
+ Self::report_segment_object(&mut reporter, segments).await;
+ });
+
+ shutdown_signal.await;
+
+ if shutdown_tx.send(()).is_err() {
+ tracing::error!("Shutdown signal send failed");
+ }
+ if let Err(e) = handle.await {
+ tracing::error!("Tokio handle join failed: {:?}", e);
+ }
+ }
+
+ async fn report_segment_object(reporter: &mut R, segments: LinkedList<SegmentObject>) {
+ let stream = stream::iter(segments);
+ if let Err(e) = reporter.collect(stream).await {
+ tracing::error!("Collect failed: {:?}", e);
+ }
+ }
+}
diff --git a/src/error/mod.rs b/src/error/mod.rs
index 54c0c68..28a3561 100644
--- a/src/error/mod.rs
+++ b/src/error/mod.rs
@@ -14,6 +14,8 @@
// limitations under the License.
//
+use tokio::{sync::oneshot, task::JoinError};
+
/// Skywalking Result.
pub type Result<T> = std::result::Result<T, Error>;
@@ -29,6 +31,15 @@ pub enum Error {
#[error("reporter shutdown failed: {0}")]
ReporterShutdown(String),
- #[error("tonic transport failed failed: {0}")]
+ #[error("tonic transport failed: {0}")]
TonicTransport(#[from] tonic::transport::Error),
+
+ #[error("tonic status: {0}")]
+ TonicStatus(#[from] tonic::Status),
+
+ #[error("tokio task join failed: {0}")]
+ TokioJoin(#[from] JoinError),
+
+ #[error("tokio oneshot receive failed: {0}")]
+ TokioOneshotRecv(#[from] oneshot::error::RecvError),
}
diff --git a/src/lib.rs b/src/lib.rs
index 5f2f89a..cc4ef8c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,6 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
+#![warn(rust_2018_idioms)]
+#![warn(clippy::dbg_macro, clippy::print_stdout)]
+#![doc = include_str!("../README.md")]
pub mod skywalking_proto {
pub mod v3 {
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index c778947..499ccf5 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -14,96 +14,43 @@
// limitations under the License.
//
-use crate::context::trace_context::TracingContext;
-use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
-use crate::skywalking_proto::v3::SegmentObject;
-use tokio::sync::mpsc;
-use tonic::transport::Channel;
-
-pub type ReporterClient = TraceSegmentReportServiceClient<Channel>;
-
-async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<(), tonic::Status> {
- let stream = async_stream::stream! {
- yield context;
- };
- match client.collect(stream).await {
- Ok(_) => Ok(()),
- Err(e) => Err(e),
- }
-}
-
-pub struct Reporter {
- tx: mpsc::Sender<TracingContext>,
- shutdown_tx: mpsc::Sender<()>,
+use super::Reporter;
+use crate::skywalking_proto::v3::{
+ trace_segment_report_service_client::TraceSegmentReportServiceClient, SegmentObject,
+};
+use futures_core::Stream;
+use tonic::{
+ async_trait,
+ transport::{self, Channel, Endpoint},
+};
+
+type ReporterClient = TraceSegmentReportServiceClient<Channel>;
+
+pub struct GrpcReporter {
+ client: ReporterClient,
}
-static CHANNEL_BUF_SIZE: usize = 1024;
-
-impl Reporter {
- /// Open gRPC client stream to send collected trace context.
- /// This function generates a new async task which watch to arrive new trace context.
- /// We can send collected context to push into sender.
- ///
- /// # Example
- ///
- /// ```no_run
- /// use std::error::Error;
- ///
- /// use tokio;
- ///
- /// use skywalking::context::trace_context::TracingContext;
- /// use skywalking::reporter::grpc::Reporter;
- ///
- /// #[tokio::main]
- /// async fn main () -> Result<(), Box<dyn Error>> {
- /// let reporter = Reporter::start("localhost:12800").await?;
- /// let mut context = TracingContext::default("service", "instance");
- /// reporter.sender().send(context).await?;
- /// reporter.shutdown().await?;
- /// Ok(())
- /// }
- /// ```
- pub async fn start(address: impl Into<String>) -> crate::Result<Self> {
- let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
- mpsc::channel(CHANNEL_BUF_SIZE);
- let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
-
- let mut reporter = ReporterClient::connect(address.into()).await?;
- tokio::spawn(async move {
- loop {
- tokio::select! {
- message = rx.recv() => {
- if let Some(message) = message {
- flush(&mut reporter, message.convert_segment_object()).await.unwrap();
- } else {
- break;
- }
- },
- _ = shutdown_rx.recv() => {
- break;
- }
- }
- }
- rx.close();
- while let Some(message) = rx.recv().await {
- flush(&mut reporter, message.convert_segment_object())
- .await
- .unwrap();
- }
- });
- Ok(Self { tx, shutdown_tx })
+impl GrpcReporter {
+ pub fn new(channel: Channel) -> Self {
+ let client = ReporterClient::new(channel);
+ Self { client }
}
- pub async fn shutdown(self) -> crate::Result<()> {
- self.shutdown_tx
- .send(())
- .await
- .map_err(|e| crate::Error::ReporterShutdown(e.to_string()))?;
- self.shutdown_tx.closed().await;
- Ok(())
+ pub async fn connect(
+ address: impl TryInto<Endpoint, Error = transport::Error>,
+ ) -> crate::Result<Self> {
+ let client = ReporterClient::connect(address.try_into()?).await?;
+ Ok(Self { client })
}
+}
- pub fn sender(&self) -> mpsc::Sender<TracingContext> {
- self.tx.clone()
+#[async_trait]
+impl Reporter for GrpcReporter {
+ async fn collect(
+ &mut self,
+ stream: impl Stream<Item = SegmentObject> + Send + 'static,
+ ) -> crate::Result<()> {
+ self.client.collect(stream).await?;
+ Ok(())
}
}
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 3a196cf..1261020 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -15,3 +15,15 @@
//
pub mod grpc;
+
+use crate::skywalking_proto::v3::SegmentObject;
+use futures_core::Stream;
+use tonic::async_trait;
+
+#[async_trait]
+pub trait Reporter {
+ async fn collect(
+ &mut self,
+ stream: impl Stream<Item = SegmentObject> + Send + 'static,
+ ) -> crate::Result<()>;
+}