You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2022/05/27 23:02:08 UTC
[qpid-proton] branch main updated: PROTON-2546: Rearrange how raw connection generates events
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new e2fa36f86 PROTON-2546: Rearrange how raw connection generates events
e2fa36f86 is described below
commit e2fa36f86990a95581363b07bcb0fd498956ce04
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri May 27 13:49:44 2022 -0400
PROTON-2546: Rearrange how raw connection generates events
Also Implement a proactor forced cleanup for raw connections to try to
limit leaking of events on forced shutdowns.
---
c/src/proactor/epoll-internal.h | 2 ++
c/src/proactor/epoll.c | 3 +++
c/src/proactor/epoll_raw_connection.c | 9 +++++++++
c/src/proactor/raw_connection-internal.h | 1 +
c/src/proactor/raw_connection.c | 5 ++++-
5 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 79dddaac6..8f765121e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -386,9 +386,11 @@ psocket_t *pni_task_raw_psocket(task_t *t);
pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready);
typedef struct praw_connection_t praw_connection_t;
+praw_connection_t *pni_task_raw_connection(task_t *t);
task_t *pni_raw_connection_task(praw_connection_t *rc);
praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
void pni_raw_connection_done(praw_connection_t *rc);
+void pni_raw_connection_forced_shutdown(praw_connection_t *rc);
pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c);
void pni_timer_free(pni_timer_t *timer);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 027ef2fe8..35a728984 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -2014,6 +2014,9 @@ void pn_proactor_free(pn_proactor_t *p) {
case LISTENER:
listener_forced_shutdown(task_listener(tsk));
break;
+ case RAW_CONNECTION:
+ pni_raw_connection_forced_shutdown(pni_task_raw_connection(tsk));
+ break;
default:
break;
}
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index a02569866..4d459a28c 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -322,6 +322,10 @@ task_t *pni_psocket_raw_task(psocket_t* ps) {
return &containerof(ps, praw_connection_t, psocket)->task;
}
+praw_connection_t *pni_task_raw_connection(task_t *t) {
+ return containerof(t, praw_connection_t, task);
+}
+
psocket_t *pni_task_raw_psocket(task_t *t) {
return &containerof(t, praw_connection_t, task)->psocket;
}
@@ -433,3 +437,8 @@ void pni_raw_connection_done(praw_connection_t *rc) {
if (notify) notify_poller(p);
if (resume_thread) pni_resume(p, resume_thread);
}
+
+void pni_raw_connection_forced_shutdown(praw_connection_t *rc) {
+ pni_raw_finalize(&rc->raw_connection);
+ praw_connection_cleanup(rc);
+}
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index ea517c636..218bf2b2d 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -110,6 +110,7 @@ struct pn_raw_connection_t {
bool rrequestedbuffers;
bool wrequestedbuffers;
+ bool connectpending;
bool rpending;
bool wpending;
bool rclosedpending;
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index 031e21fb0..a7aa21d11 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -465,7 +465,7 @@ static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {
void pni_raw_connected(pn_raw_connection_t *conn) {
pn_condition_clear(conn->condition);
- pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
+ conn->connectpending = true;
conn->state = pni_raw_new_state(conn, conn_connected);
}
@@ -665,6 +665,9 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
pn_event_t *event = pn_collector_next(conn->collector);
if (event) {
return pni_log_event(conn, event);
+ } else if (conn->connectpending) {
+ pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
+ conn->connectpending = false;
} else if (conn->wakepending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
conn->wakepending = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org