You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by he...@apache.org on 2022/10/25 01:43:10 UTC

[skywalking-php] branch master updated: Utilize UnixListener for the worker process to accept reports. (#26)

This is an automated email from the ASF dual-hosted git repository.

heyanlong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-php.git


The following commit(s) were added to refs/heads/master by this push:
     new e6b733b  Utilize UnixListener for the worker process to accept reports. (#26)
e6b733b is described below

commit e6b733b29ee1bd41814359f643b4a935919d5bc9
Author: phanalpha <ph...@hotmail.com>
AuthorDate: Tue Oct 25 09:43:06 2022 +0800

    Utilize UnixListener for the worker process to accept reports. (#26)
    
    * (Unix) listener worker
---
 Cargo.lock     |   1 +
 Cargo.toml     |   1 +
 src/channel.rs | 117 ++++++++++++++-------------------------------------------
 src/module.rs  |  29 +++++++++-----
 src/worker.rs  | 107 ++++++++++++++++++++++++++++++++++++++++++++--------
 5 files changed, 141 insertions(+), 114 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 436b4fd..b18f2b7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1897,6 +1897,7 @@ dependencies = [
  "reqwest",
  "skywalking",
  "systemstat",
+ "tempfile",
  "tokio",
  "tokio-stream",
  "tonic",
diff --git a/Cargo.toml b/Cargo.toml
index 8c49f83..2322115 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,6 +49,7 @@ phper = "0.5.1"
 prost = "0.11.0"
 skywalking = "0.4.0"
 systemstat = "0.2.0"
+tempfile = "3.3.0"
 tokio = { version = "1.21.0", features = ["full"] }
 tokio-stream = "0.1.9"
 tonic = "0.8.0"
diff --git a/src/channel.rs b/src/channel.rs
index eaf7097..fe802a0 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -13,49 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use anyhow::{anyhow, bail, Context};
+use anyhow::anyhow;
 use once_cell::sync::OnceCell;
-use skywalking::reporter::{grpc::ColletcItemConsume, CollectItem, Report};
+use skywalking::reporter::{CollectItem, Report};
 use std::{
-    error::Error,
-    io::{self, Write},
-    mem::size_of,
-    os::unix::net::UnixStream as StdUnixStream,
-    sync::Mutex,
+    io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex,
 };
-use tokio::{io::AsyncReadExt, net::UnixStream};
-use tonic::async_trait;
+use tokio::io::AsyncReadExt;
 use tracing::error;
 
-static SENDER: OnceCell<Mutex<StdUnixStream>> = OnceCell::new();
-static RECEIVER: OnceCell<Mutex<Option<StdUnixStream>>> = OnceCell::new();
-
-pub fn init_channel() -> anyhow::Result<()> {
-    let (sender, receiver) = StdUnixStream::pair()?;
-
-    sender.set_nonblocking(false)?;
-    receiver.set_nonblocking(true)?;
-
-    if SENDER.set(Mutex::new(sender)).is_err() {
-        bail!("Channel has initialized");
-    }
-
-    if RECEIVER.set(Mutex::new(Some(receiver))).is_err() {
-        bail!("Channel has initialized");
-    }
-
-    Ok(())
-}
-
-fn channel_send(data: CollectItem) -> anyhow::Result<()> {
+fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
+where
+    T: DerefMut<Target = UnixStream>,
+{
     let content = bincode::serialize(&data)?;
 
-    let mut sender = SENDER
-        .get()
-        .context("Channel haven't initialized")?
-        .lock()
-        .map_err(|_| anyhow!("Get lock failed"))?;
-
     sender.write_all(&content.len().to_le_bytes())?;
     sender.write_all(&content)?;
     sender.flush()?;
@@ -63,7 +35,7 @@ fn channel_send(data: CollectItem) -> anyhow::Result<()> {
     Ok(())
 }
 
-async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectItem> {
+pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> anyhow::Result<CollectItem> {
     let mut size_buf = [0u8; size_of::<usize>()];
     receiver.read_exact(&mut size_buf).await?;
     let size = usize::from_le_bytes(size_buf);
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectIte
     Ok(item)
 }
 
-fn channel_try_receive(receiver: &UnixStream) -> anyhow::Result<Option<CollectItem>> {
-    let mut size_buf = [0u8; size_of::<usize>()];
-    if let Err(e) = receiver.try_read(&mut size_buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-    let size = usize::from_le_bytes(size_buf);
-
-    let mut buf = vec![0u8; size];
-    if let Err(e) = receiver.try_read(&mut buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-
-    let item = bincode::deserialize(&buf)?;
-    Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+    worker_addr: T,
+    stream: OnceCell<Mutex<UnixStream>>,
 }
 
-pub struct Reporter;
-
-impl Report for Reporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = channel_send(item) {
-            error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+    pub fn new(worker_addr: T) -> Self {
+        Self {
+            worker_addr,
+            stream: OnceCell::new(),
         }
     }
-}
 
-pub struct Consumer(UnixStream);
-
-impl Consumer {
-    pub fn new() -> anyhow::Result<Self> {
-        let receiver = RECEIVER.get().context("Channel haven't initialized")?;
-        let receiver = receiver
+    fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+        let stream = self
+            .stream
+            .get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))?
             .lock()
-            .map_err(|_| anyhow!("Get Lock failed"))?
-            .take()
-            .context("The RECEIVER has been taked")?;
-        let receiver =
-            UnixStream::from_std(receiver).context("try into tokio unix stream failed")?;
-        Ok(Self(receiver))
-    }
-}
+            .map_err(|_| anyhow!("Get Lock failed"))?;
 
-#[async_trait]
-impl ColletcItemConsume for Consumer {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        match channel_receive(&mut self.0).await {
-            Ok(item) => Ok(Some(item)),
-            Err(e) => Err(e.into()),
-        }
+        channel_send(item, stream)
     }
+}
 
-    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        match channel_try_receive(&self.0) {
-            Ok(item) => Ok(item),
-            Err(e) => Err(e.into()),
+impl<T: AsRef<Path>> Report for Reporter<T> {
+    fn report(&self, item: CollectItem) {
+        if let Err(err) = self.try_report(item) {
+            error!(?err, "channel send failed");
         }
     }
 }
diff --git a/src/module.rs b/src/module.rs
index 332e5a8..3a1343c 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -14,7 +14,7 @@
 // limitations under the License.
 
 use crate::{
-    channel::{self, init_channel},
+    channel::Reporter,
     execute::register_execute_functions,
     util::{get_sapi_module_name, IPS},
     worker::init_worker,
@@ -48,11 +48,6 @@ pub fn init(_module: ModuleContext) -> bool {
 
     init_logger();
 
-    if let Err(e) = init_channel() {
-        error!("Init channel failed: {}", e);
-        return true;
-    }
-
     let service_name = Lazy::force(&SERVICE_NAME);
     let service_instance = Lazy::force(&SERVICE_INSTANCE);
     let skywalking_version = Lazy::force(&SKYWALKING_VERSION);
@@ -61,14 +56,30 @@ pub fn init(_module: ModuleContext) -> bool {
         service_instance, skywalking_version, "Starting skywalking agent"
     );
 
+    let worker_addr = {
+        match tempfile::NamedTempFile::new() {
+            Err(e) => {
+                error!("Create named temporary file failed: {}", e);
+                return true;
+            }
+            Ok(f) => match f.into_temp_path().to_str() {
+                None => {
+                    error!("Yields a &str slice from the Path failed.");
+                    return true;
+                }
+                Some(s) => s.to_string(),
+            },
+        }
+    };
+
+    init_worker(&worker_addr);
+
     tracer::set_global_tracer(Tracer::new(
         service_name,
         service_instance,
-        channel::Reporter,
+        Reporter::new(worker_addr),
     ));
 
-    init_worker();
-
     register_execute_functions();
 
     true
diff --git a/src/worker.rs b/src/worker.rs
index fdccb33..a4256b2 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -13,22 +13,36 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
-use phper::ini::Ini;
-use skywalking::reporter::grpc::GrpcReporter;
 use std::{
-    cmp::Ordering, num::NonZeroUsize, process::exit, thread::available_parallelism, time::Duration,
+    cmp::Ordering, error::Error, io, num::NonZeroUsize, path::Path, process::exit,
+    thread::available_parallelism, time::Duration,
+};
+
+use phper::ini::Ini;
+use skywalking::reporter::{
+    grpc::{ColletcItemConsume, GrpcReporter},
+    CollectItem,
 };
 use tokio::{
+    net::UnixListener,
     runtime::{self, Runtime},
     select,
     signal::unix::{signal, SignalKind},
+    sync::mpsc,
     time::sleep,
 };
-use tonic::transport::{Channel, Endpoint};
+use tonic::{
+    async_trait,
+    transport::{Channel, Endpoint},
+};
 use tracing::{debug, error, info, warn};
 
-pub fn init_worker() {
+use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
+
+pub fn init_worker<P>(worker_addr: P)
+where
+    P: AsRef<Path> + tracing::Value,
+{
     let server_addr = Ini::get::<String>(SKYWALKING_AGENT_SERVER_ADDR).unwrap_or_default();
     let worker_threads = worker_threads();
 
@@ -44,7 +58,7 @@ pub fn init_worker() {
             }
             Ordering::Equal => {
                 let rt = new_tokio_runtime(worker_threads);
-                rt.block_on(start_worker(server_addr));
+                rt.block_on(start_worker(worker_addr, server_addr));
                 exit(0);
             }
             Ordering::Greater => {}
@@ -70,7 +84,10 @@ fn new_tokio_runtime(worker_threads: usize) -> Runtime {
         .unwrap()
 }
 
-async fn start_worker(server_addr: String) {
+async fn start_worker<P>(worker_addr: P, server_addr: String)
+where
+    P: AsRef<Path> + tracing::Value,
+{
     debug!("Starting worker...");
 
     // Graceful shutdown signal, put it on the top of program.
@@ -83,23 +100,61 @@ async fn start_worker(server_addr: String) {
     };
 
     let fut = async move {
-        let endpoint = match Endpoint::from_shared(server_addr) {
-            Ok(endpoint) => endpoint,
+        debug!(worker_addr, "Bind");
+        let listener = match UnixListener::bind(worker_addr) {
+            Ok(listener) => listener,
             Err(err) => {
-                error!(?err, "Create endpoint failed");
+                error!(?err, "Bind failed");
                 return;
             }
         };
-        let channel = connect(endpoint).await;
 
-        let consumer = match channel::Consumer::new() {
-            Ok(consumer) => consumer,
+        let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
+        tokio::spawn(async move {
+            loop {
+                match listener.accept().await {
+                    Ok((mut stream, _addr)) => {
+                        let tx = tx.clone();
+
+                        tokio::spawn(async move {
+                            debug!("Entering channel_receive loop");
+
+                            loop {
+                                let r = match channel::channel_receive(&mut stream).await {
+                                    Err(err) => match err.downcast_ref::<io::Error>() {
+                                        Some(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
+                                            debug!("Leaving channel_receive loop");
+                                            return;
+                                        }
+                                        _ => Err(err.into()),
+                                    },
+                                    Ok(i) => Ok(i),
+                                };
+
+                                if let Err(err) = tx.send(r).await {
+                                    error!(?err, "Send failed");
+                                    return;
+                                }
+                            }
+                        });
+                    }
+                    Err(err) => {
+                        error!(?err, "Accept failed");
+                    }
+                }
+            }
+        });
+
+        let endpoint = match Endpoint::from_shared(server_addr) {
+            Ok(endpoint) => endpoint,
             Err(err) => {
-                error!(?err, "Create consumer failed");
+                error!(?err, "Create endpoint failed");
                 return;
             }
         };
-        let reporter = GrpcReporter::new_with_pc(channel, (), consumer);
+        let channel = connect(endpoint).await;
+
+        let reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
 
         // report_instance_properties(channel.clone()).await;
         // mark_ready_for_request();
@@ -146,3 +201,23 @@ async fn connect(endpoint: Endpoint) -> Channel {
 
     channel
 }
+
+struct Consumer(mpsc::Receiver<Result<CollectItem, Box<dyn Error + Send>>>);
+
+#[async_trait]
+impl ColletcItemConsume for Consumer {
+    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
+        self.0
+            .recv()
+            .await
+            .map(|result| result.map(Some))
+            .unwrap_or(Ok(None))
+    }
+
+    async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
+        self.0
+            .try_recv()
+            .map(|result| result.map(Some))
+            .unwrap_or(Ok(None))
+    }
+}