You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2023/03/24 06:57:34 UTC
[qpid-proton] 01/03: PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit d08e4a22e86610243dab1e5c08a8fd4c1c0d001b
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Sun Mar 19 23:00:12 2023 -0700
PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
---
c/src/proactor/epoll_raw_connection.c | 28 ++++++++++-
c/tests/CMakeLists.txt | 2 +-
c/tests/raw_connection_test.cpp | 87 +++++++++++++++++++++++++++++++++++
3 files changed, 114 insertions(+), 3 deletions(-)
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 919a4b808..cb61d6a75 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,6 +50,7 @@ struct praw_connection_t {
struct addrinfo *ai; /* Current connect address */
bool connected;
bool disconnected;
+ bool batch_empty;
};
static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -317,7 +318,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
unlock(&rc->task.mutex);
if (waking) pni_raw_wake(raw);
- return pni_raw_event_next(raw);
+ pn_event_t *e = pni_raw_event_next(raw);
+ if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
+ rc->batch_empty = true;
+ return e;
}
task_t *pni_psocket_raw_task(psocket_t* ps) {
@@ -373,6 +377,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
if (rc->disconnected) {
pni_raw_connect_failed(&rc->raw_connection);
unlock(&rc->task.mutex);
+ rc->batch_empty = false;
return &rc->batch;
}
if (events & (EPOLLHUP | EPOLLERR)) {
@@ -398,19 +403,38 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
if (wake) pni_raw_wake(&rc->raw_connection);
if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+ rc->batch_empty = false;
return &rc->batch;
}
void pni_raw_connection_done(praw_connection_t *rc) {
bool notify = false;
bool ready = false;
+ bool have_event = false;
+
+ // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary.
+ if (!rc->batch_empty) {
+ if (pn_collector_peek(rc->raw_connection.collector))
+ have_event = true;
+ else {
+ pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+ // State machine up to date.
+ if (e) {
+ have_event = true;
+ // Sole event. Can put back without order issues.
+ // Edge case, performance not important.
+ pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e));
+ }
+ }
+ }
+
lock(&rc->task.mutex);
pn_proactor_t *p = rc->task.proactor;
tslot_t *ts = rc->task.runner;
rc->task.working = false;
- notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
// The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
+ notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task);
ready = rc->task.ready;
unlock(&rc->task.mutex);
diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt
index 641ba3c75..8ff0d8da9 100644
--- a/c/tests/CMakeLists.txt
+++ b/c/tests/CMakeLists.txt
@@ -79,7 +79,7 @@ if (CMAKE_CXX_COMPILER)
add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp)
target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
- add_c_test(c-raw-connection-test raw_connection_test.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
+ add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS})
add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp)
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index 0f31c4910..9f5a9b72e 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -832,3 +832,90 @@ TEST_CASE("raw connection") {
}
}
}
+
+// WAKE tests require a running proactor.
+
+#include "../src/proactor/proactor-internal.h"
+#include "./pn_test_proactor.hpp"
+#include <proton/event.h>
+#include <proton/listener.h>
+
+namespace {
+
+class common_handler : public handler {
+ handler *accept_; // Handler for accepted connections
+ bool close_on_wake_;
+ pn_raw_connection_t *last_server_;
+
+public:
+ explicit common_handler(handler *accept = 0) : accept_(accept), close_on_wake_(false), last_server_(0) {}
+
+ void set_close_on_wake(bool b) { close_on_wake_ = b; }
+
+ pn_raw_connection_t *last_server() { return last_server_; }
+
+ bool handle(pn_event_t *e) override {
+ switch (pn_event_type(e)) {
+ /* Always stop on these noteworthy events */
+ case PN_LISTENER_OPEN:
+ case PN_LISTENER_CLOSE:
+ case PN_PROACTOR_INACTIVE:
+ return true;
+
+ case PN_LISTENER_ACCEPT: {
+ listener = pn_event_listener(e);
+ pn_raw_connection_t *rc = pn_raw_connection();
+ pn_listener_raw_accept(listener, rc);
+ last_server_ = rc;
+ return false;
+ } break;
+
+ case PN_RAW_CONNECTION_WAKE: {
+ if (close_on_wake_) {
+ pn_raw_connection_t *rc = pn_event_raw_connection(e);
+ pn_raw_connection_close(rc);
+ }
+ return true;
+ } break;
+
+
+ default:
+ return false;
+ }
+ }
+};
+
+
+} // namespace
+
+// Test waking up a connection that is idle
+TEST_CASE("proactor_raw_connection_wake") {
+ common_handler h;
+ proactor p(&h);
+ pn_listener_t *l = p.listen();
+ REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+ pn_raw_connection_t *rc = pn_raw_connection();
+ std::string addr = ":" + pn_test::listening_port(l);
+ pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
+
+
+ REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+ CHECK(pn_proactor_get(p) == NULL); /* idle */
+ pn_raw_connection_wake(rc);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+ CHECK(pn_proactor_get(p) == NULL); /* idle */
+
+ h.set_close_on_wake(true);
+ pn_raw_connection_wake(rc);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+ pn_raw_connection_wake(h.last_server());
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+ REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+ pn_listener_close(l);
+ REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+ REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org