You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2023/03/24 06:57:35 UTC
[qpid-proton] 02/03: PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 47b958f271ea0637046c9b85de4690bd4dbebb1d
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Thu Mar 23 17:05:49 2023 -0700
PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
---
c/src/proactor/epoll_raw_connection.c | 34 +++++++++++++++++---------------
c/src/proactor/raw_connection-internal.h | 2 ++
c/src/proactor/raw_connection.c | 12 ++++++++++-
3 files changed, 31 insertions(+), 17 deletions(-)
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index cb61d6a75..56bebee85 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -367,7 +367,18 @@ static void set_error(pn_raw_connection_t *conn, const char *msg, int err) {
pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) {
praw_connection_t *rc = containerof(t, praw_connection_t, task);
+ bool task_wake = false;
+ bool can_wake = pni_raw_can_wake(&rc->raw_connection);
lock(&rc->task.mutex);
+ t->working = true;
+ if (sched_ready)
+ schedule_done(t);
+ if (pni_task_wake_pending(&rc->task)) {
+ if (can_wake)
+ task_wake = true; // batch_next() will complete the task wake.
+ else
+ pni_task_wake_done(&rc->task); // Complete task wake without event.
+ }
int events = io_events;
int fd = rc->psocket.epoll_io.fd;
if (!rc->connected) {
@@ -381,26 +392,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
return &rc->batch;
}
if (events & (EPOLLHUP | EPOLLERR)) {
+ // A wake can be the first event. Otherwise, wait for connection to complete.
+ bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector);
+ t->working = event_pending;
unlock(&rc->task.mutex);
- return NULL;
+ return event_pending ? &rc->batch : NULL;
}
- praw_connection_connected_lh(rc);
+ if (events & EPOLLOUT)
+ praw_connection_connected_lh(rc);
}
unlock(&rc->task.mutex);
- bool wake = false;
- lock(&t->mutex);
- t->working = true;
- if (sched_ready) {
- schedule_done(t);
- if (pni_task_wake_pending(&rc->task)) {
- wake = true;
- pni_task_wake_done(&rc->task);
- }
- }
- unlock(&t->mutex);
-
- if (wake) pni_raw_wake(&rc->raw_connection);
if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
rc->batch_empty = false;
@@ -417,7 +419,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
if (pn_collector_peek(rc->raw_connection.collector))
have_event = true;
else {
- pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+ pn_event_t *e = pni_raw_batch_next(&rc->batch);
// State machine up to date.
if (e) {
have_event = true;
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 218bf2b2d..47b0ea925 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -126,6 +126,8 @@ bool pni_raw_validate(pn_raw_connection_t *conn);
void pni_raw_connected(pn_raw_connection_t *conn);
void pni_raw_connect_failed(pn_raw_connection_t *conn);
void pni_raw_wake(pn_raw_connection_t *conn);
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn);
+bool pni_raw_can_wake(pn_raw_connection_t *conn);
void pni_raw_close(pn_raw_connection_t *conn);
void pni_raw_read_close(pn_raw_connection_t *conn);
void pni_raw_write_close(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index a7aa21d11..fd633a284 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -475,7 +475,17 @@ void pni_raw_connect_failed(pn_raw_connection_t *conn) {
}
void pni_raw_wake(pn_raw_connection_t *conn) {
- conn->wakepending = true;
+ if (conn->disconnect_state != disc_fini)
+ conn->wakepending = true;
+}
+
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) {
+ return conn->wakepending;
+}
+
+bool pni_raw_can_wake(pn_raw_connection_t *conn) {
+ // True if DISCONNECTED event has not yet been extracted from the batch.
+ return (conn->disconnect_state != disc_fini);
}
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org