You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by he...@apache.org on 2022/10/25 01:43:10 UTC
[skywalking-php] branch master updated: Utilize UnixListener for the worker process to accept reports. (#26)
This is an automated email from the ASF dual-hosted git repository.
heyanlong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-php.git
The following commit(s) were added to refs/heads/master by this push:
new e6b733b Utilize UnixListener for the worker process to accept reports. (#26)
e6b733b is described below
commit e6b733b29ee1bd41814359f643b4a935919d5bc9
Author: phanalpha <ph...@hotmail.com>
AuthorDate: Tue Oct 25 09:43:06 2022 +0800
Utilize UnixListener for the worker process to accept reports. (#26)
* (Unix) listener worker
---
Cargo.lock | 1 +
Cargo.toml | 1 +
src/channel.rs | 117 ++++++++++++++-------------------------------------------
src/module.rs | 29 +++++++++-----
src/worker.rs | 107 ++++++++++++++++++++++++++++++++++++++++++++--------
5 files changed, 141 insertions(+), 114 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 436b4fd..b18f2b7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1897,6 +1897,7 @@ dependencies = [
"reqwest",
"skywalking",
"systemstat",
+ "tempfile",
"tokio",
"tokio-stream",
"tonic",
diff --git a/Cargo.toml b/Cargo.toml
index 8c49f83..2322115 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,6 +49,7 @@ phper = "0.5.1"
prost = "0.11.0"
skywalking = "0.4.0"
systemstat = "0.2.0"
+tempfile = "3.3.0"
tokio = { version = "1.21.0", features = ["full"] }
tokio-stream = "0.1.9"
tonic = "0.8.0"
diff --git a/src/channel.rs b/src/channel.rs
index eaf7097..fe802a0 100644
--- a/src/channel.rs
+++ b/src/channel.rs
@@ -13,49 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use anyhow::{anyhow, bail, Context};
+use anyhow::anyhow;
use once_cell::sync::OnceCell;
-use skywalking::reporter::{grpc::ColletcItemConsume, CollectItem, Report};
+use skywalking::reporter::{CollectItem, Report};
use std::{
- error::Error,
- io::{self, Write},
- mem::size_of,
- os::unix::net::UnixStream as StdUnixStream,
- sync::Mutex,
+ io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex,
};
-use tokio::{io::AsyncReadExt, net::UnixStream};
-use tonic::async_trait;
+use tokio::io::AsyncReadExt;
use tracing::error;
-static SENDER: OnceCell<Mutex<StdUnixStream>> = OnceCell::new();
-static RECEIVER: OnceCell<Mutex<Option<StdUnixStream>>> = OnceCell::new();
-
-pub fn init_channel() -> anyhow::Result<()> {
- let (sender, receiver) = StdUnixStream::pair()?;
-
- sender.set_nonblocking(false)?;
- receiver.set_nonblocking(true)?;
-
- if SENDER.set(Mutex::new(sender)).is_err() {
- bail!("Channel has initialized");
- }
-
- if RECEIVER.set(Mutex::new(Some(receiver))).is_err() {
- bail!("Channel has initialized");
- }
-
- Ok(())
-}
-
-fn channel_send(data: CollectItem) -> anyhow::Result<()> {
+fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
+where
+ T: DerefMut<Target = UnixStream>,
+{
let content = bincode::serialize(&data)?;
- let mut sender = SENDER
- .get()
- .context("Channel haven't initialized")?
- .lock()
- .map_err(|_| anyhow!("Get lock failed"))?;
-
sender.write_all(&content.len().to_le_bytes())?;
sender.write_all(&content)?;
sender.flush()?;
@@ -63,7 +35,7 @@ fn channel_send(data: CollectItem) -> anyhow::Result<()> {
Ok(())
}
-async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectItem> {
+pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> anyhow::Result<CollectItem> {
let mut size_buf = [0u8; size_of::<usize>()];
receiver.read_exact(&mut size_buf).await?;
let size = usize::from_le_bytes(size_buf);
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectIte
Ok(item)
}
-fn channel_try_receive(receiver: &UnixStream) -> anyhow::Result<Option<CollectItem>> {
- let mut size_buf = [0u8; size_of::<usize>()];
- if let Err(e) = receiver.try_read(&mut size_buf) {
- if e.kind() == io::ErrorKind::WouldBlock {
- return Ok(None);
- }
- return Err(e.into());
- }
- let size = usize::from_le_bytes(size_buf);
-
- let mut buf = vec![0u8; size];
- if let Err(e) = receiver.try_read(&mut buf) {
- if e.kind() == io::ErrorKind::WouldBlock {
- return Ok(None);
- }
- return Err(e.into());
- }
-
- let item = bincode::deserialize(&buf)?;
- Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+ worker_addr: T,
+ stream: OnceCell<Mutex<UnixStream>>,
}
-pub struct Reporter;
-
-impl Report for Reporter {
- fn report(&self, item: CollectItem) {
- if let Err(err) = channel_send(item) {
- error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+ pub fn new(worker_addr: T) -> Self {
+ Self {
+ worker_addr,
+ stream: OnceCell::new(),
}
}
-}
-pub struct Consumer(UnixStream);
-
-impl Consumer {
- pub fn new() -> anyhow::Result<Self> {
- let receiver = RECEIVER.get().context("Channel haven't initialized")?;
- let receiver = receiver
+ fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+ let stream = self
+ .stream
+ .get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))?
.lock()
- .map_err(|_| anyhow!("Get Lock failed"))?
- .take()
- .context("The RECEIVER has been taked")?;
- let receiver =
- UnixStream::from_std(receiver).context("try into tokio unix stream failed")?;
- Ok(Self(receiver))
- }
-}
+ .map_err(|_| anyhow!("Get Lock failed"))?;
-#[async_trait]
-impl ColletcItemConsume for Consumer {
- async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- match channel_receive(&mut self.0).await {
- Ok(item) => Ok(Some(item)),
- Err(e) => Err(e.into()),
- }
+ channel_send(item, stream)
}
+}
- async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
- match channel_try_receive(&self.0) {
- Ok(item) => Ok(item),
- Err(e) => Err(e.into()),
+impl<T: AsRef<Path>> Report for Reporter<T> {
+ fn report(&self, item: CollectItem) {
+ if let Err(err) = self.try_report(item) {
+ error!(?err, "channel send failed");
}
}
}
diff --git a/src/module.rs b/src/module.rs
index 332e5a8..3a1343c 100644
--- a/src/module.rs
+++ b/src/module.rs
@@ -14,7 +14,7 @@
// limitations under the License.
use crate::{
- channel::{self, init_channel},
+ channel::Reporter,
execute::register_execute_functions,
util::{get_sapi_module_name, IPS},
worker::init_worker,
@@ -48,11 +48,6 @@ pub fn init(_module: ModuleContext) -> bool {
init_logger();
- if let Err(e) = init_channel() {
- error!("Init channel failed: {}", e);
- return true;
- }
-
let service_name = Lazy::force(&SERVICE_NAME);
let service_instance = Lazy::force(&SERVICE_INSTANCE);
let skywalking_version = Lazy::force(&SKYWALKING_VERSION);
@@ -61,14 +56,30 @@ pub fn init(_module: ModuleContext) -> bool {
service_instance, skywalking_version, "Starting skywalking agent"
);
+ let worker_addr = {
+ match tempfile::NamedTempFile::new() {
+ Err(e) => {
+ error!("Create named temporary file failed: {}", e);
+ return true;
+ }
+ Ok(f) => match f.into_temp_path().to_str() {
+ None => {
+ error!("Yields a &str slice from the Path failed.");
+ return true;
+ }
+ Some(s) => s.to_string(),
+ },
+ }
+ };
+
+ init_worker(&worker_addr);
+
tracer::set_global_tracer(Tracer::new(
service_name,
service_instance,
- channel::Reporter,
+ Reporter::new(worker_addr),
));
- init_worker();
-
register_execute_functions();
true
diff --git a/src/worker.rs b/src/worker.rs
index fdccb33..a4256b2 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -13,22 +13,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
-use phper::ini::Ini;
-use skywalking::reporter::grpc::GrpcReporter;
use std::{
- cmp::Ordering, num::NonZeroUsize, process::exit, thread::available_parallelism, time::Duration,
+ cmp::Ordering, error::Error, io, num::NonZeroUsize, path::Path, process::exit,
+ thread::available_parallelism, time::Duration,
+};
+
+use phper::ini::Ini;
+use skywalking::reporter::{
+ grpc::{ColletcItemConsume, GrpcReporter},
+ CollectItem,
};
use tokio::{
+ net::UnixListener,
runtime::{self, Runtime},
select,
signal::unix::{signal, SignalKind},
+ sync::mpsc,
time::sleep,
};
-use tonic::transport::{Channel, Endpoint};
+use tonic::{
+ async_trait,
+ transport::{Channel, Endpoint},
+};
use tracing::{debug, error, info, warn};
-pub fn init_worker() {
+use crate::{channel, SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS};
+
+pub fn init_worker<P>(worker_addr: P)
+where
+ P: AsRef<Path> + tracing::Value,
+{
let server_addr = Ini::get::<String>(SKYWALKING_AGENT_SERVER_ADDR).unwrap_or_default();
let worker_threads = worker_threads();
@@ -44,7 +58,7 @@ pub fn init_worker() {
}
Ordering::Equal => {
let rt = new_tokio_runtime(worker_threads);
- rt.block_on(start_worker(server_addr));
+ rt.block_on(start_worker(worker_addr, server_addr));
exit(0);
}
Ordering::Greater => {}
@@ -70,7 +84,10 @@ fn new_tokio_runtime(worker_threads: usize) -> Runtime {
.unwrap()
}
-async fn start_worker(server_addr: String) {
+async fn start_worker<P>(worker_addr: P, server_addr: String)
+where
+ P: AsRef<Path> + tracing::Value,
+{
debug!("Starting worker...");
// Graceful shutdown signal, put it on the top of program.
@@ -83,23 +100,61 @@ async fn start_worker(server_addr: String) {
};
let fut = async move {
- let endpoint = match Endpoint::from_shared(server_addr) {
- Ok(endpoint) => endpoint,
+ debug!(worker_addr, "Bind");
+ let listener = match UnixListener::bind(worker_addr) {
+ Ok(listener) => listener,
Err(err) => {
- error!(?err, "Create endpoint failed");
+ error!(?err, "Bind failed");
return;
}
};
- let channel = connect(endpoint).await;
- let consumer = match channel::Consumer::new() {
- Ok(consumer) => consumer,
+ let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
+ tokio::spawn(async move {
+ loop {
+ match listener.accept().await {
+ Ok((mut stream, _addr)) => {
+ let tx = tx.clone();
+
+ tokio::spawn(async move {
+ debug!("Entering channel_receive loop");
+
+ loop {
+ let r = match channel::channel_receive(&mut stream).await {
+ Err(err) => match err.downcast_ref::<io::Error>() {
+ Some(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
+ debug!("Leaving channel_receive loop");
+ return;
+ }
+ _ => Err(err.into()),
+ },
+ Ok(i) => Ok(i),
+ };
+
+ if let Err(err) = tx.send(r).await {
+ error!(?err, "Send failed");
+ return;
+ }
+ }
+ });
+ }
+ Err(err) => {
+ error!(?err, "Accept failed");
+ }
+ }
+ }
+ });
+
+ let endpoint = match Endpoint::from_shared(server_addr) {
+ Ok(endpoint) => endpoint,
Err(err) => {
- error!(?err, "Create consumer failed");
+ error!(?err, "Create endpoint failed");
return;
}
};
- let reporter = GrpcReporter::new_with_pc(channel, (), consumer);
+ let channel = connect(endpoint).await;
+
+ let reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));
// report_instance_properties(channel.clone()).await;
// mark_ready_for_request();
@@ -146,3 +201,23 @@ async fn connect(endpoint: Endpoint) -> Channel {
channel
}
+
+struct Consumer(mpsc::Receiver<Result<CollectItem, Box<dyn Error + Send>>>);
+
+#[async_trait]
+impl ColletcItemConsume for Consumer {
+ async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
+ self.0
+ .recv()
+ .await
+ .map(|result| result.map(Some))
+ .unwrap_or(Ok(None))
+ }
+
+ async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
+ self.0
+ .try_recv()
+ .map(|result| result.map(Some))
+ .unwrap_or(Ok(None))
+ }
+}