You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2023/03/24 06:57:35 UTC

[qpid-proton] 02/03: PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 47b958f271ea0637046c9b85de4690bd4dbebb1d
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Thu Mar 23 17:05:49 2023 -0700

    PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
---
 c/src/proactor/epoll_raw_connection.c    | 34 +++++++++++++++++---------------
 c/src/proactor/raw_connection-internal.h |  2 ++
 c/src/proactor/raw_connection.c          | 12 ++++++++++-
 3 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index cb61d6a75..56bebee85 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -367,7 +367,18 @@ static void  set_error(pn_raw_connection_t *conn, const char *msg, int err) {
 
 pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) {
   praw_connection_t *rc = containerof(t, praw_connection_t, task);
+  bool task_wake = false;
+  bool can_wake = pni_raw_can_wake(&rc->raw_connection);
   lock(&rc->task.mutex);
+  t->working = true;
+  if (sched_ready)
+    schedule_done(t);
+  if (pni_task_wake_pending(&rc->task)) {
+    if (can_wake)
+      task_wake = true; // batch_next() will complete the task wake.
+    else
+      pni_task_wake_done(&rc->task);  // Complete task wake without event.
+  }
   int events = io_events;
   int fd = rc->psocket.epoll_io.fd;
   if (!rc->connected) {
@@ -381,26 +392,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
       return &rc->batch;
     }
     if (events & (EPOLLHUP | EPOLLERR)) {
+      // A wake can be the first event.  Otherwise, wait for connection to complete.
+      bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector);
+      t->working = event_pending;
       unlock(&rc->task.mutex);
-      return NULL;
+      return event_pending ? &rc->batch : NULL;
     }
-    praw_connection_connected_lh(rc);
+    if (events & EPOLLOUT)
+      praw_connection_connected_lh(rc);
   }
   unlock(&rc->task.mutex);
 
-  bool wake = false;
-  lock(&t->mutex);
-  t->working = true;
-  if (sched_ready) {
-    schedule_done(t);
-    if (pni_task_wake_pending(&rc->task)) {
-      wake = true;
-      pni_task_wake_done(&rc->task);
-    }
-  }
-  unlock(&t->mutex);
-
-  if (wake) pni_raw_wake(&rc->raw_connection);
   if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
   if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
   rc->batch_empty = false;
@@ -417,7 +419,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
     if (pn_collector_peek(rc->raw_connection.collector))
       have_event = true;
     else {
-      pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+      pn_event_t *e = pni_raw_batch_next(&rc->batch);
       // State machine up to date.
       if (e) {
         have_event = true;
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 218bf2b2d..47b0ea925 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -126,6 +126,8 @@ bool pni_raw_validate(pn_raw_connection_t *conn);
 void pni_raw_connected(pn_raw_connection_t *conn);
 void pni_raw_connect_failed(pn_raw_connection_t *conn);
 void pni_raw_wake(pn_raw_connection_t *conn);
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn);
+bool pni_raw_can_wake(pn_raw_connection_t *conn);
 void pni_raw_close(pn_raw_connection_t *conn);
 void pni_raw_read_close(pn_raw_connection_t *conn);
 void pni_raw_write_close(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index a7aa21d11..fd633a284 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -475,7 +475,17 @@ void pni_raw_connect_failed(pn_raw_connection_t *conn) {
 }
 
 void pni_raw_wake(pn_raw_connection_t *conn) {
-  conn->wakepending = true;
+  if (conn->disconnect_state != disc_fini)
+    conn->wakepending = true;
+}
+
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) {
+  return conn->wakepending;
+}
+
+bool pni_raw_can_wake(pn_raw_connection_t *conn) {
+  // True if DISCONNECTED event has not yet been extracted from the batch.
+  return (conn->disconnect_state != disc_fini);
 }
 
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org