You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/10 14:49:18 UTC

[skywalking-rust] branch master updated: Add tracer. (#26)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-rust.git


The following commit(s) were added to refs/heads/master by this push:
     new 699247d  Add tracer. (#26)
699247d is described below

commit 699247dfbb804e8541dd6174a2c2f238862709ec
Author: jmjoy <91...@qq.com>
AuthorDate: Sun Jul 10 22:49:14 2022 +0800

    Add tracer. (#26)
---
 .github/workflows/ci.yaml       |   7 ++
 Cargo.toml                      |   5 +-
 README.md                       |  42 ++++++++----
 e2e/Cargo.lock                  |  26 +++++--
 e2e/src/main.rs                 |  78 +++++++++++----------
 examples/simple_trace_report.rs |  50 +++++++++++---
 src/context/mod.rs              |   1 +
 src/context/tracer.rs           | 147 ++++++++++++++++++++++++++++++++++++++++
 src/error/mod.rs                |  13 +++-
 src/lib.rs                      |   3 +
 src/reporter/grpc.rs            | 117 +++++++++-----------------------
 src/reporter/mod.rs             |  12 ++++
 12 files changed, 351 insertions(+), 150 deletions(-)

diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index cd63125..ecdc8cd 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -25,6 +25,11 @@ on:
     tags:
       - 'v*'
 
+env:
+  CARGO_TERM_COLOR: always
+  RUST_BACKTRACE: "1"
+  RUSTFLAGS: "-D warnings"
+
 jobs:
   CI:
     runs-on: ubuntu-latest
@@ -48,3 +53,5 @@ jobs:
         run: cargo clippy --workspace --tests -- -D warnings
       - name: Run tests
         run: cargo test --workspace
+      - name: Run docs
+        run: cargo rustdoc --all-features -- -D warnings
diff --git a/Cargo.toml b/Cargo.toml
index 4cc8d14..aa7b9d7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,11 +30,14 @@ repository = "https://github.com/apache/skywalking-rust"
 async-stream = "0.3.3"
 base64 = "0.13.0"
 bytes = "1.1.0"
+futures-core = "0.3.21"
+futures-util = "0.3.21"
 prost = "0.10.4"
 prost-derive = "0.10.1"
 thiserror = "1.0.31"
 tokio = { version = "1.18.2", features = ["full"] }
-tonic = "0.7.2"
+tonic = { version = "0.7.2", features = ["codegen"] }
+tracing = "0.1.35"
 uuid = { version = "1.1.0", features = ["serde", "v4"] }
 
 [build-dependencies]
diff --git a/README.md b/README.md
index 079be17..58cec12 100644
--- a/README.md
+++ b/README.md
@@ -33,24 +33,27 @@ context after the span finished.
 
 # Example
 
-```rust
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
-use tokio;
+```rust, no_run
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
+use std::error::Error;
+use std::sync::Arc;
+use tokio::signal;
+
+async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
+    let mut ctx = tracer.create_trace_context();
 
-async fn handle_request(reporter: ContextReporter) {
-    let mut ctx = TracingContext::default("svc", "ins");
     {
         // Generate an Entry Span when a request
         // is received. An Entry Span is generated only once per context.
-        let span = ctx.create_entry_span("operation1").unwrap();
+        let span = ctx.create_entry_span("op1").unwrap();
 
         // Something...
 
         {
             // Generates an Exit Span when executing an RPC.
-            let span2 = ctx.create_exit_span("operation2").unwrap();
-            
+            let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
+
             // Something...
 
             ctx.finalize_span(span2);
@@ -58,21 +61,32 @@ async fn handle_request(reporter: ContextReporter) {
 
         ctx.finalize_span(span);
     }
-    reporter.send(context).await;
+
+    tracer.finalize_context(ctx);
 }
 
 #[tokio::main]
-async fn main() {
-    let tx = Reporter::start("http://0.0.0.0:11800").await;
+async fn main() -> Result<(), Box<dyn Error>> {
+    let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+    let tracer = Arc::new(Tracer::new("service", "instance", reporter));
+
+    tokio::spawn(handle_request(tracer.clone()));
 
-    // Start server...
+    // Start to report.
+    let handle = tracer.reporting(async move {
+        let _ = signal::ctrl_c().await;
+    });
+
+    handle.await?;
+
+    Ok(())
 }
 ```
 
 # How to compile?
 If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:
 
-```
+```shell
 tar -xvzf skywalking-(VERSION).crate
 ```
 
diff --git a/e2e/Cargo.lock b/e2e/Cargo.lock
index 1023a8b..e6c6232 100644
--- a/e2e/Cargo.lock
+++ b/e2e/Cargo.lock
@@ -231,6 +231,17 @@ version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
 
+[[package]]
+name = "futures-macro"
+version = "0.3.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "futures-sink"
 version = "0.3.21"
@@ -250,9 +261,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
 dependencies = [
  "futures-core",
+ "futures-macro",
  "futures-task",
  "pin-project-lite",
  "pin-utils",
+ "slab",
 ]
 
 [[package]]
@@ -783,12 +796,15 @@ dependencies = [
  "async-stream",
  "base64",
  "bytes",
+ "futures-core",
+ "futures-util",
  "prost",
  "prost-derive",
  "thiserror",
  "tokio",
  "tonic",
  "tonic-build",
+ "tracing",
  "uuid",
 ]
 
@@ -1068,9 +1084,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
 
 [[package]]
 name = "tracing"
-version = "0.1.34"
+version = "0.1.35"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
+checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
 dependencies = [
  "cfg-if",
  "log",
@@ -1092,11 +1108,11 @@ dependencies = [
 
 [[package]]
 name = "tracing-core"
-version = "0.1.26"
+version = "0.1.28"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
+checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7"
 dependencies = [
- "lazy_static",
+ "once_cell",
 ]
 
 [[package]]
diff --git a/e2e/src/main.rs b/e2e/src/main.rs
index a0b500b..b6f7b19 100644
--- a/e2e/src/main.rs
+++ b/e2e/src/main.rs
@@ -21,23 +21,35 @@ use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
 use skywalking::context::propagation::context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY;
 use skywalking::context::propagation::decoder::decode_propagation;
 use skywalking::context::propagation::encoder::encode_propagation;
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
 use std::convert::Infallible;
 use std::error::Error;
+use std::future::pending;
 use std::net::SocketAddr;
 use structopt::StructOpt;
-use tokio::sync::mpsc;
+use tokio::sync::OnceCell;
 
 static NOT_FOUND_MSG: &str = "not found";
 static SUCCESS_MSG: &str = "Success";
 
+static GLOBAL_TRACER: OnceCell<Tracer<GrpcReporter>> = OnceCell::const_new();
+
+fn set_global_tracer(tracer: Tracer<GrpcReporter>) {
+    if GLOBAL_TRACER.set(tracer).is_err() {
+        panic!("TRACER has setted")
+    }
+}
+
+fn get_global_tracer() -> &'static Tracer<GrpcReporter> {
+    GLOBAL_TRACER.get().expect("TRACER haven't setted")
+}
+
 async fn handle_ping(
     _req: Request<Body>,
     client: Client<HttpConnector>,
-    tx: mpsc::Sender<TracingContext>,
 ) -> Result<Response<Body>, Infallible> {
-    let mut context = TracingContext::default("producer", "node_0");
+    let mut context = get_global_tracer().create_trace_context();
     let span = context.create_entry_span("/ping").unwrap();
     {
         let span2 = context.create_exit_span("/pong", "consumer:8082").unwrap();
@@ -53,17 +65,16 @@ async fn handle_ping(
         context.finalize_span(span2);
     }
     context.finalize_span(span);
-    let _ = tx.send(context).await;
+    get_global_tracer().finalize_context(context);
     Ok(Response::new(Body::from("hoge")))
 }
 
 async fn producer_response(
     _req: Request<Body>,
     client: Client<HttpConnector>,
-    tx: mpsc::Sender<TracingContext>,
 ) -> Result<Response<Body>, Infallible> {
     match (_req.method(), _req.uri().path()) {
-        (&Method::GET, "/ping") => handle_ping(_req, client, tx).await,
+        (&Method::GET, "/ping") => handle_ping(_req, client).await,
         (&Method::GET, "/healthCheck") => Ok(Response::builder()
             .status(StatusCode::OK)
             .body(Body::from(SUCCESS_MSG))
@@ -75,15 +86,14 @@ async fn producer_response(
     }
 }
 
-async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
+async fn run_producer_service(host: [u8; 4]) {
     let client = Client::new();
     let make_svc = make_service_fn(|_| {
-        let tx = tx.clone();
         let client = client.clone();
 
         async {
             Ok::<_, Infallible>(service_fn(move |req| {
-                producer_response(req, client.to_owned(), tx.to_owned())
+                producer_response(req, client.to_owned())
             }))
         }
     });
@@ -95,29 +105,23 @@ async fn run_producer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
     }
 }
 
-async fn handle_pong(
-    _req: Request<Body>,
-    tx: mpsc::Sender<TracingContext>,
-) -> Result<Response<Body>, Infallible> {
+async fn handle_pong(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
     let ctx = decode_propagation(
         _req.headers()[SKYWALKING_HTTP_CONTEXT_HEADER_KEY]
             .to_str()
             .unwrap(),
     )
     .unwrap();
-    let mut context = TracingContext::from_propagation_context("consumer", "node_0", ctx);
+    let mut context = get_global_tracer().create_trace_context_from_propagation(ctx);
     let span = context.create_entry_span("/pong").unwrap();
     context.finalize_span(span);
-    let _ = tx.send(context).await;
+    get_global_tracer().finalize_context(context);
     Ok(Response::new(Body::from("hoge")))
 }
 
-async fn consumer_response(
-    _req: Request<Body>,
-    tx: mpsc::Sender<TracingContext>,
-) -> Result<Response<Body>, Infallible> {
+async fn consumer_response(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
     match (_req.method(), _req.uri().path()) {
-        (&Method::GET, "/pong") => handle_pong(_req, tx).await,
+        (&Method::GET, "/pong") => handle_pong(_req).await,
         (&Method::GET, "/healthCheck") => Ok(Response::builder()
             .status(StatusCode::OK)
             .body(Body::from(SUCCESS_MSG))
@@ -129,11 +133,9 @@ async fn consumer_response(
     }
 }
 
-async fn run_consumer_service(host: [u8; 4], tx: mpsc::Sender<TracingContext>) {
-    let make_svc = make_service_fn(|_| {
-        let tx = tx.clone();
-        async { Ok::<_, Infallible>(service_fn(move |req| consumer_response(req, tx.to_owned()))) }
-    });
+async fn run_consumer_service(host: [u8; 4]) {
+    let make_svc =
+        make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(consumer_response)) });
     let addr = SocketAddr::from((host, 8082));
     let server = Server::bind(&addr).serve(make_svc);
 
@@ -153,15 +155,23 @@ struct Opt {
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn Error>> {
     let opt = Opt::from_args();
-    let reporter = Reporter::start("http://collector:19876").await?;
-    let tx = reporter.sender();
+    let reporter = GrpcReporter::connect("http://collector:19876").await?;
 
-    if opt.mode == "consumer" {
-        run_consumer_service([0, 0, 0, 0], tx).await;
+    let handle = if opt.mode == "consumer" {
+        set_global_tracer(Tracer::new("consumer", "node_0", reporter));
+        let handle = get_global_tracer().reporting(pending());
+        run_consumer_service([0, 0, 0, 0]).await;
+        handle
     } else if opt.mode == "producer" {
-        run_producer_service([0, 0, 0, 0], tx).await;
-    }
+        set_global_tracer(Tracer::new("producer", "node_0", reporter));
+        let handle = get_global_tracer().reporting(pending());
+        run_producer_service([0, 0, 0, 0]).await;
+        handle
+    } else {
+        unreachable!()
+    };
+
+    handle.await?;
 
-    reporter.shutdown().await?;
     Ok(())
 }
diff --git a/examples/simple_trace_report.rs b/examples/simple_trace_report.rs
index 97dc9c6..6d8d7fc 100644
--- a/examples/simple_trace_report.rs
+++ b/examples/simple_trace_report.rs
@@ -15,20 +15,50 @@
 // specific language governing permissions and limitations
 // under the License.
 //
+use skywalking::context::tracer::Tracer;
+use skywalking::reporter::grpc::GrpcReporter;
 use std::error::Error;
+use std::sync::Arc;
+use tokio::signal;
 
-use skywalking::context::trace_context::TracingContext;
-use skywalking::reporter::grpc::Reporter;
+async fn handle_request(tracer: Arc<Tracer<GrpcReporter>>) {
+    let mut ctx = tracer.create_trace_context();
 
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn Error>> {
-    let reporter = Reporter::start("http://0.0.0.0:11800").await?;
-    let mut context = TracingContext::default("service", "instance");
     {
-        let span = context.create_entry_span("op1").unwrap();
-        context.finalize_span(span);
+        // Generate an Entry Span when a request
+        // is received. An Entry Span is generated only once per context.
+        let span = ctx.create_entry_span("op1").unwrap();
+
+        // Something...
+
+        {
+            // Generates an Exit Span when executing an RPC.
+            let span2 = ctx.create_exit_span("op2", "remote_peer").unwrap();
+
+            // Something...
+
+            ctx.finalize_span(span2);
+        }
+
+        ctx.finalize_span(span);
     }
-    reporter.sender().send(context).await?;
-    reporter.shutdown().await?;
+
+    tracer.finalize_context(ctx);
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn Error>> {
+    let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;
+    let tracer = Arc::new(Tracer::new("service", "instance", reporter));
+
+    tokio::spawn(handle_request(tracer.clone()));
+
+    // Start to report.
+    let handle = tracer.reporting(async move {
+        let _ = signal::ctrl_c().await;
+    });
+
+    handle.await?;
+
     Ok(())
 }
diff --git a/src/context/mod.rs b/src/context/mod.rs
index 3e9e727..99ddb08 100644
--- a/src/context/mod.rs
+++ b/src/context/mod.rs
@@ -17,3 +17,4 @@
 pub mod propagation;
 pub mod system_time;
 pub mod trace_context;
+pub mod tracer;
diff --git a/src/context/tracer.rs b/src/context/tracer.rs
new file mode 100644
index 0000000..7ca4bd1
--- /dev/null
+++ b/src/context/tracer.rs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use super::propagation::context::PropagationContext;
+use crate::{
+    context::trace_context::TracingContext, reporter::Reporter, skywalking_proto::v3::SegmentObject,
+};
+use futures_util::stream;
+use std::future::Future;
+use std::{collections::LinkedList, sync::Arc};
+use tokio::{
+    sync::{
+        mpsc::{self, UnboundedReceiver},
+        Mutex,
+    },
+    task::JoinHandle,
+};
+
+/// Skywalking tracer.
+pub struct Tracer<R: Reporter + Send + Sync + 'static> {
+    service_name: String,
+    instance_name: String,
+    reporter: Arc<Mutex<R>>,
+    segment_sender: mpsc::UnboundedSender<SegmentObject>,
+    segment_receiver: Arc<Mutex<mpsc::UnboundedReceiver<SegmentObject>>>,
+}
+
+impl<R: Reporter + Send + Sync + 'static> Tracer<R> {
+    /// New with service info and reporter.
+    pub fn new(service_name: impl ToString, instance_name: impl ToString, reporter: R) -> Self {
+        let (segment_sender, segment_receiver) = mpsc::unbounded_channel();
+
+        Self {
+            service_name: service_name.to_string(),
+            instance_name: instance_name.to_string(),
+            reporter: Arc::new(Mutex::new(reporter)),
+            segment_sender,
+            segment_receiver: Arc::new(Mutex::new(segment_receiver)),
+        }
+    }
+
+    /// Create trace conetxt.
+    pub fn create_trace_context(&self) -> TracingContext {
+        TracingContext::default(&self.service_name, &self.instance_name)
+    }
+
+    /// Create trace conetxt from propagation.
+    pub fn create_trace_context_from_propagation(
+        &self,
+        context: PropagationContext,
+    ) -> TracingContext {
+        TracingContext::from_propagation_context(&self.service_name, &self.instance_name, context)
+    }
+
+    /// Finalize the trace context.
+    pub fn finalize_context(&self, context: TracingContext) {
+        let segment_object = context.convert_segment_object();
+        if self.segment_sender.send(segment_object).is_err() {
+            tracing::warn!("segment object channel has closed");
+        }
+    }
+
+    /// Start to reporting, quit when shutdown_signal received.
+    ///
+    /// Accept a `shutdown_signal` argument as a graceful shutdown signal.
+    pub fn reporting(
+        &self,
+        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+    ) -> JoinHandle<()> {
+        let reporter = self.reporter.clone();
+        let segment_receiver = self.segment_receiver.clone();
+        tokio::spawn(Self::do_reporting(
+            reporter,
+            segment_receiver,
+            shutdown_signal,
+        ))
+    }
+
+    async fn do_reporting(
+        reporter: Arc<Mutex<R>>,
+        segment_receiver: Arc<Mutex<UnboundedReceiver<SegmentObject>>>,
+        shutdown_signal: impl Future<Output = ()> + Send + Sync + 'static,
+    ) {
+        let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
+
+        let handle = tokio::spawn(async move {
+            loop {
+                let mut segment_receiver = segment_receiver.lock().await;
+                let mut segments = LinkedList::new();
+
+                tokio::select! {
+                    segment = segment_receiver.recv() => {
+                        drop(segment_receiver);
+
+                        if let Some(segment) = segment {
+                            // TODO Implement batch collect in future.
+                            segments.push_back(segment);
+                            let mut reporter = reporter.lock().await;
+                            Self::report_segment_object(&mut reporter, segments).await;
+                        } else {
+                            break;
+                        }
+                    }
+                    _ =  shutdown_rx.recv() => break,
+                }
+            }
+
+            // Flush.
+            let mut segment_receiver = segment_receiver.lock().await;
+            let mut segments = LinkedList::new();
+            while let Ok(segment) = segment_receiver.try_recv() {
+                segments.push_back(segment);
+            }
+            let mut reporter = reporter.lock().await;
+            Self::report_segment_object(&mut reporter, segments).await;
+        });
+
+        shutdown_signal.await;
+
+        if shutdown_tx.send(()).is_err() {
+            tracing::error!("Shutdown signal send failed");
+        }
+        if let Err(e) = handle.await {
+            tracing::error!("Tokio handle join failed: {:?}", e);
+        }
+    }
+
+    async fn report_segment_object(reporter: &mut R, segments: LinkedList<SegmentObject>) {
+        let stream = stream::iter(segments);
+        if let Err(e) = reporter.collect(stream).await {
+            tracing::error!("Collect failed: {:?}", e);
+        }
+    }
+}
diff --git a/src/error/mod.rs b/src/error/mod.rs
index 54c0c68..28a3561 100644
--- a/src/error/mod.rs
+++ b/src/error/mod.rs
@@ -14,6 +14,8 @@
 // limitations under the License.
 //
 
+use tokio::{sync::oneshot, task::JoinError};
+
 /// Skywalking Result.
 pub type Result<T> = std::result::Result<T, Error>;
 
@@ -29,6 +31,15 @@ pub enum Error {
     #[error("reporter shutdown failed: {0}")]
     ReporterShutdown(String),
 
-    #[error("tonic transport failed failed: {0}")]
+    #[error("tonic transport failed: {0}")]
     TonicTransport(#[from] tonic::transport::Error),
+
+    #[error("tonic status: {0}")]
+    TonicStatus(#[from] tonic::Status),
+
+    #[error("tokio task join failed: {0}")]
+    TokioJoin(#[from] JoinError),
+
+    #[error("tokio oneshot receive failed: {0}")]
+    TokioOneshotRecv(#[from] oneshot::error::RecvError),
 }
diff --git a/src/lib.rs b/src/lib.rs
index 5f2f89a..cc4ef8c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -13,6 +13,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 //
+#![warn(rust_2018_idioms)]
+#![warn(clippy::dbg_macro, clippy::print_stdout)]
+#![doc = include_str!("../README.md")]
 
 pub mod skywalking_proto {
     pub mod v3 {
diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs
index c778947..499ccf5 100644
--- a/src/reporter/grpc.rs
+++ b/src/reporter/grpc.rs
@@ -14,96 +14,43 @@
 // limitations under the License.
 //
 
-use crate::context::trace_context::TracingContext;
-use crate::skywalking_proto::v3::trace_segment_report_service_client::TraceSegmentReportServiceClient;
-use crate::skywalking_proto::v3::SegmentObject;
-use tokio::sync::mpsc;
-use tonic::transport::Channel;
-
-pub type ReporterClient = TraceSegmentReportServiceClient<Channel>;
-
-async fn flush(client: &mut ReporterClient, context: SegmentObject) -> Result<(), tonic::Status> {
-    let stream = async_stream::stream! {
-        yield context;
-    };
-    match client.collect(stream).await {
-        Ok(_) => Ok(()),
-        Err(e) => Err(e),
-    }
-}
-
-pub struct Reporter {
-    tx: mpsc::Sender<TracingContext>,
-    shutdown_tx: mpsc::Sender<()>,
+use super::Reporter;
+use crate::skywalking_proto::v3::{
+    trace_segment_report_service_client::TraceSegmentReportServiceClient, SegmentObject,
+};
+use futures_core::Stream;
+use tonic::{
+    async_trait,
+    transport::{self, Channel, Endpoint},
+};
+
+type ReporterClient = TraceSegmentReportServiceClient<Channel>;
+
+pub struct GrpcReporter {
+    client: ReporterClient,
 }
 
-static CHANNEL_BUF_SIZE: usize = 1024;
-
-impl Reporter {
-    /// Open gRPC client stream to send collected trace context.
-    /// This function generates a new async task which watch to arrive new trace context.
-    /// We can send collected context to push into sender.
-    ///
-    /// # Example
-    ///
-    /// ```no_run
-    /// use std::error::Error;
-    ///
-    /// use tokio;
-    ///
-    /// use skywalking::context::trace_context::TracingContext;
-    /// use skywalking::reporter::grpc::Reporter;
-    ///
-    /// #[tokio::main]
-    /// async fn main () -> Result<(), Box<dyn Error>> {
-    ///     let reporter = Reporter::start("localhost:12800").await?;
-    ///     let mut context = TracingContext::default("service", "instance");
-    ///     reporter.sender().send(context).await?;
-    ///     reporter.shutdown().await?;
-    ///     Ok(())
-    /// }
-    /// ```
-    pub async fn start(address: impl Into<String>) -> crate::Result<Self> {
-        let (tx, mut rx): (mpsc::Sender<TracingContext>, mpsc::Receiver<TracingContext>) =
-            mpsc::channel(CHANNEL_BUF_SIZE);
-        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
-
-        let mut reporter = ReporterClient::connect(address.into()).await?;
-        tokio::spawn(async move {
-            loop {
-                tokio::select! {
-                    message = rx.recv() => {
-                        if let Some(message) = message {
-                            flush(&mut reporter, message.convert_segment_object()).await.unwrap();
-                        } else {
-                            break;
-                        }
-                    },
-                    _ = shutdown_rx.recv() => {
-                        break;
-                    }
-                }
-            }
-            rx.close();
-            while let Some(message) = rx.recv().await {
-                flush(&mut reporter, message.convert_segment_object())
-                    .await
-                    .unwrap();
-            }
-        });
-        Ok(Self { tx, shutdown_tx })
+impl GrpcReporter {
+    pub fn new(channel: Channel) -> Self {
+        let client = ReporterClient::new(channel);
+        Self { client }
     }
 
-    pub async fn shutdown(self) -> crate::Result<()> {
-        self.shutdown_tx
-            .send(())
-            .await
-            .map_err(|e| crate::Error::ReporterShutdown(e.to_string()))?;
-        self.shutdown_tx.closed().await;
-        Ok(())
+    pub async fn connect(
+        address: impl TryInto<Endpoint, Error = transport::Error>,
+    ) -> crate::Result<Self> {
+        let client = ReporterClient::connect(address.try_into()?).await?;
+        Ok(Self { client })
     }
+}
 
-    pub fn sender(&self) -> mpsc::Sender<TracingContext> {
-        self.tx.clone()
+#[async_trait]
+impl Reporter for GrpcReporter {
+    async fn collect(
+        &mut self,
+        stream: impl Stream<Item = SegmentObject> + Send + 'static,
+    ) -> crate::Result<()> {
+        self.client.collect(stream).await?;
+        Ok(())
     }
 }
diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs
index 3a196cf..1261020 100644
--- a/src/reporter/mod.rs
+++ b/src/reporter/mod.rs
@@ -15,3 +15,15 @@
 //
 
 pub mod grpc;
+
+use crate::skywalking_proto::v3::SegmentObject;
+use futures_core::Stream;
+use tonic::async_trait;
+
+#[async_trait]
+pub trait Reporter {
+    async fn collect(
+        &mut self,
+        stream: impl Stream<Item = SegmentObject> + Send + 'static,
+    ) -> crate::Result<()>;
+}