You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/10/23 06:54:07 UTC

[GitHub] [skywalking-php] phanalpha commented on a diff in pull request #26: Utilize UnixListener for the worker process to accept reports.

phanalpha commented on code in PR #26:
URL: https://github.com/apache/skywalking-php/pull/26#discussion_r1002650323


##########
src/channel.rs:
##########
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectIte
     Ok(item)
 }
 
-fn channel_try_receive(receiver: &UnixStream) -> anyhow::Result<Option<CollectItem>> {
-    let mut size_buf = [0u8; size_of::<usize>()];
-    if let Err(e) = receiver.try_read(&mut size_buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-    let size = usize::from_le_bytes(size_buf);
-
-    let mut buf = vec![0u8; size];
-    if let Err(e) = receiver.try_read(&mut buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-
-    let item = bincode::deserialize(&buf)?;
-    Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+    worker_addr: T,
+    stream: OnceCell<Mutex<UnixStream>>,
 }
 
-pub struct Reporter;
-
-impl Report for Reporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = channel_send(item) {
-            error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+    pub fn new(worker_addr: T) -> Self {
+        Self {
+            worker_addr,
+            stream: OnceCell::new(),
         }
     }
-}
 
-pub struct Consumer(UnixStream);
-
-impl Consumer {
-    pub fn new() -> anyhow::Result<Self> {
-        let receiver = RECEIVER.get().context("Channel haven't initialized")?;
-        let receiver = receiver
+    fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+        let stream = self
+            .stream
+            .get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))?
             .lock()
-            .map_err(|_| anyhow!("Get Lock failed"))?
-            .take()
-            .context("The RECEIVER has been taked")?;
-        let receiver =
-            UnixStream::from_std(receiver).context("try into tokio unix stream failed")?;
-        Ok(Self(receiver))
-    }
-}
+            .map_err(|_| anyhow!("Get Lock failed"))?;
 
-#[async_trait]
-impl ColletcItemConsume for Consumer {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        match channel_receive(&mut self.0).await {
-            Ok(item) => Ok(Some(item)),
-            Err(e) => Err(e.into()),
-        }
+        channel_send(item, stream)

Review Comment:
   Non-blocking has its own problem as I already stated. See https://github.com/apache/skywalking/issues/9831#issuecomment-1287874012.
   I've tried to set it to non-blocking and it just failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org