You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2023/03/24 06:57:34 UTC

[qpid-proton] 01/03: PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit d08e4a22e86610243dab1e5c08a8fd4c1c0d001b
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Sun Mar 19 23:00:12 2023 -0700

    PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
---
 c/src/proactor/epoll_raw_connection.c | 28 ++++++++++-
 c/tests/CMakeLists.txt                |  2 +-
 c/tests/raw_connection_test.cpp       | 87 +++++++++++++++++++++++++++++++++++
 3 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 919a4b808..cb61d6a75 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,6 +50,7 @@ struct praw_connection_t {
   struct addrinfo *ai;               /* Current connect address */
   bool connected;
   bool disconnected;
+  bool batch_empty;
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -317,7 +318,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
   unlock(&rc->task.mutex);
   if (waking) pni_raw_wake(raw);
 
-  return pni_raw_event_next(raw);
+  pn_event_t *e = pni_raw_event_next(raw);
+  if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
+    rc->batch_empty = true;
+  return e;
 }
 
 task_t *pni_psocket_raw_task(psocket_t* ps) {
@@ -373,6 +377,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
     if (rc->disconnected) {
       pni_raw_connect_failed(&rc->raw_connection);
       unlock(&rc->task.mutex);
+      rc->batch_empty = false;
       return &rc->batch;
     }
     if (events & (EPOLLHUP | EPOLLERR)) {
@@ -398,19 +403,38 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
   if (wake) pni_raw_wake(&rc->raw_connection);
   if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
   if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+  rc->batch_empty = false;
   return &rc->batch;
 }
 
 void pni_raw_connection_done(praw_connection_t *rc) {
   bool notify = false;
   bool ready = false;
+  bool have_event = false;
+
+  // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary.
+  if (!rc->batch_empty) {
+    if (pn_collector_peek(rc->raw_connection.collector))
+      have_event = true;
+    else {
+      pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+      // State machine up to date.
+      if (e) {
+        have_event = true;
+        // Sole event.  Can put back without order issues.
+        // Edge case, performance not important.
+        pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e));
+      }
+    }
+  }
+
   lock(&rc->task.mutex);
   pn_proactor_t *p = rc->task.proactor;
   tslot_t *ts = rc->task.runner;
   rc->task.working = false;
-  notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
+  notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task);
   ready = rc->task.ready;
   unlock(&rc->task.mutex);
 
diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt
index 641ba3c75..8ff0d8da9 100644
--- a/c/tests/CMakeLists.txt
+++ b/c/tests/CMakeLists.txt
@@ -79,7 +79,7 @@ if (CMAKE_CXX_COMPILER)
     add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp)
     target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
-    add_c_test(c-raw-connection-test raw_connection_test.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
+    add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
     target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS})
 
     add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp)
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index 0f31c4910..9f5a9b72e 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -832,3 +832,90 @@ TEST_CASE("raw connection") {
     }
   }
 }
+
+// WAKE tests require a running proactor.
+
+#include "../src/proactor/proactor-internal.h"
+#include "./pn_test_proactor.hpp"
+#include <proton/event.h>
+#include <proton/listener.h>
+
+namespace {
+
+class common_handler : public handler {
+  handler *accept_; // Handler for accepted connections
+  bool close_on_wake_;
+  pn_raw_connection_t *last_server_;
+
+public:
+  explicit common_handler(handler *accept = 0) : accept_(accept), close_on_wake_(false), last_server_(0) {}
+
+  void set_close_on_wake(bool b) { close_on_wake_ = b; }
+
+  pn_raw_connection_t *last_server() { return last_server_; }
+
+  bool handle(pn_event_t *e) override {
+    switch (pn_event_type(e)) {
+      /* Always stop on these noteworthy events */
+    case PN_LISTENER_OPEN:
+    case PN_LISTENER_CLOSE:
+    case PN_PROACTOR_INACTIVE:
+      return true;
+
+    case PN_LISTENER_ACCEPT: {
+      listener = pn_event_listener(e);
+      pn_raw_connection_t *rc = pn_raw_connection();
+      pn_listener_raw_accept(listener, rc);
+      last_server_ = rc;
+      return false;
+    } break;
+
+    case PN_RAW_CONNECTION_WAKE: {
+      if (close_on_wake_) {
+        pn_raw_connection_t *rc = pn_event_raw_connection(e);
+        pn_raw_connection_close(rc);
+      }
+      return true;
+    } break;
+
+
+    default:
+      return false;
+    }
+  }
+};
+
+
+} // namespace
+
+// Test waking up a connection that is idle
+TEST_CASE("proactor_raw_connection_wake") {
+  common_handler h;
+  proactor p(&h);
+  pn_listener_t *l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+  pn_raw_connection_t *rc = pn_raw_connection();
+  std::string addr = ":" + pn_test::listening_port(l);
+  pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
+
+
+  REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+    pn_raw_connection_wake(rc);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+
+  h.set_close_on_wake(true);
+  pn_raw_connection_wake(rc);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+  pn_raw_connection_wake(h.last_server());
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org