You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/10/23 08:37:08 UTC

[GitHub] [skywalking-php] wu-sheng commented on a diff in pull request #26: Utilize UnixListener for the worker process to accept reports.

wu-sheng commented on code in PR #26:
URL: https://github.com/apache/skywalking-php/pull/26#discussion_r1002666870


##########
src/channel.rs:
##########
@@ -75,67 +47,34 @@ async fn channel_receive(receiver: &mut UnixStream) -> anyhow::Result<CollectIte
     Ok(item)
 }
 
-fn channel_try_receive(receiver: &UnixStream) -> anyhow::Result<Option<CollectItem>> {
-    let mut size_buf = [0u8; size_of::<usize>()];
-    if let Err(e) = receiver.try_read(&mut size_buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-    let size = usize::from_le_bytes(size_buf);
-
-    let mut buf = vec![0u8; size];
-    if let Err(e) = receiver.try_read(&mut buf) {
-        if e.kind() == io::ErrorKind::WouldBlock {
-            return Ok(None);
-        }
-        return Err(e.into());
-    }
-
-    let item = bincode::deserialize(&buf)?;
-    Ok(item)
+pub struct Reporter<T: AsRef<Path>> {
+    worker_addr: T,
+    stream: OnceCell<Mutex<UnixStream>>,
 }
 
-pub struct Reporter;
-
-impl Report for Reporter {
-    fn report(&self, item: CollectItem) {
-        if let Err(err) = channel_send(item) {
-            error!(?err, "channel send failed");
+impl<T: AsRef<Path>> Reporter<T> {
+    pub fn new(worker_addr: T) -> Self {
+        Self {
+            worker_addr,
+            stream: OnceCell::new(),
         }
     }
-}
 
-pub struct Consumer(UnixStream);
-
-impl Consumer {
-    pub fn new() -> anyhow::Result<Self> {
-        let receiver = RECEIVER.get().context("Channel haven't initialized")?;
-        let receiver = receiver
+    fn try_report(&self, item: CollectItem) -> anyhow::Result<()> {
+        let stream = self
+            .stream
+            .get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))?
             .lock()
-            .map_err(|_| anyhow!("Get Lock failed"))?
-            .take()
-            .context("The RECEIVER has been taked")?;
-        let receiver =
-            UnixStream::from_std(receiver).context("try into tokio unix stream failed")?;
-        Ok(Self(receiver))
-    }
-}
+            .map_err(|_| anyhow!("Get Lock failed"))?;
 
-#[async_trait]
-impl ColletcItemConsume for Consumer {
-    async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
-        match channel_receive(&mut self.0).await {
-            Ok(item) => Ok(Some(item)),
-            Err(e) => Err(e.into()),
-        }
+        channel_send(item, stream)

Review Comment:
   From the connection perspective, I think keeping connected makes perfect sense. And using the blocking way to send is not a block. 
   The key to resolving the concern is, let's not block the use process. This is why, usually, before sending the segments/spans through the network, there is a no-blocking in-memory queue.
   Ref from the Java agent
   https://github.com/apache/skywalking-java/blob/1148ec3801179c005304ae96dd76d7eac1cbfafa/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java#L68
   The DataCarrier with IF_POSSIBLE is a queue implementation, which could provide both buffering mechanism and auto-drop mechanism. The auto-drop would automatically drop the new message to the queue, if the queue is full, which could be caused by slowly sending or the connection disconnected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org