You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2023/03/24 06:57:33 UTC

[qpid-proton] branch main updated (34b4d930d -> f13bb8132)

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


    from 34b4d930d PROTON-2691: Fix -Wstrict-prototypes compile warning from Clang (#389)
     new d08e4a22e PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
     new 47b958f27 PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
     new f13bb8132 PROTON-2673: clarify doc for pn_raw_connection_wake()

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/include/proton/raw_connection.h        | 11 +++-
 c/src/proactor/epoll_raw_connection.c    | 60 +++++++++++++++-------
 c/src/proactor/raw_connection-internal.h |  2 +
 c/src/proactor/raw_connection.c          | 12 ++++-
 c/tests/CMakeLists.txt                   |  2 +-
 c/tests/raw_connection_test.cpp          | 87 ++++++++++++++++++++++++++++++++
 6 files changed, 154 insertions(+), 20 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 01/03: PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit d08e4a22e86610243dab1e5c08a8fd4c1c0d001b
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Sun Mar 19 23:00:12 2023 -0700

    PROTON-2695: epoll raw connections - reschedule task to finish unprocessed events from an event batch.
---
 c/src/proactor/epoll_raw_connection.c | 28 ++++++++++-
 c/tests/CMakeLists.txt                |  2 +-
 c/tests/raw_connection_test.cpp       | 87 +++++++++++++++++++++++++++++++++++
 3 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 919a4b808..cb61d6a75 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,6 +50,7 @@ struct praw_connection_t {
   struct addrinfo *ai;               /* Current connect address */
   bool connected;
   bool disconnected;
+  bool batch_empty;
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -317,7 +318,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
   unlock(&rc->task.mutex);
   if (waking) pni_raw_wake(raw);
 
-  return pni_raw_event_next(raw);
+  pn_event_t *e = pni_raw_event_next(raw);
+  if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
+    rc->batch_empty = true;
+  return e;
 }
 
 task_t *pni_psocket_raw_task(psocket_t* ps) {
@@ -373,6 +377,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
     if (rc->disconnected) {
       pni_raw_connect_failed(&rc->raw_connection);
       unlock(&rc->task.mutex);
+      rc->batch_empty = false;
       return &rc->batch;
     }
     if (events & (EPOLLHUP | EPOLLERR)) {
@@ -398,19 +403,38 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
   if (wake) pni_raw_wake(&rc->raw_connection);
   if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
   if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
+  rc->batch_empty = false;
   return &rc->batch;
 }
 
 void pni_raw_connection_done(praw_connection_t *rc) {
   bool notify = false;
   bool ready = false;
+  bool have_event = false;
+
+  // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary.
+  if (!rc->batch_empty) {
+    if (pn_collector_peek(rc->raw_connection.collector))
+      have_event = true;
+    else {
+      pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+      // State machine up to date.
+      if (e) {
+        have_event = true;
+        // Sole event.  Can put back without order issues.
+        // Edge case, performance not important.
+        pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e));
+      }
+    }
+  }
+
   lock(&rc->task.mutex);
   pn_proactor_t *p = rc->task.proactor;
   tslot_t *ts = rc->task.runner;
   rc->task.working = false;
-  notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
+  notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task);
   ready = rc->task.ready;
   unlock(&rc->task.mutex);
 
diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt
index 641ba3c75..8ff0d8da9 100644
--- a/c/tests/CMakeLists.txt
+++ b/c/tests/CMakeLists.txt
@@ -79,7 +79,7 @@ if (CMAKE_CXX_COMPILER)
     add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp)
     target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
-    add_c_test(c-raw-connection-test raw_connection_test.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
+    add_c_test(c-raw-connection-test raw_connection_test.cpp pn_test_proactor.cpp $<TARGET_OBJECTS:qpid-proton-proactor-objects>)
     target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS})
 
     add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp)
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index 0f31c4910..9f5a9b72e 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -832,3 +832,90 @@ TEST_CASE("raw connection") {
     }
   }
 }
+
+// WAKE tests require a running proactor.
+
+#include "../src/proactor/proactor-internal.h"
+#include "./pn_test_proactor.hpp"
+#include <proton/event.h>
+#include <proton/listener.h>
+
+namespace {
+
+class common_handler : public handler {
+  handler *accept_; // Handler for accepted connections
+  bool close_on_wake_;
+  pn_raw_connection_t *last_server_;
+
+public:
+  explicit common_handler(handler *accept = 0) : accept_(accept), close_on_wake_(false), last_server_(0) {}
+
+  void set_close_on_wake(bool b) { close_on_wake_ = b; }
+
+  pn_raw_connection_t *last_server() { return last_server_; }
+
+  bool handle(pn_event_t *e) override {
+    switch (pn_event_type(e)) {
+      /* Always stop on these noteworthy events */
+    case PN_LISTENER_OPEN:
+    case PN_LISTENER_CLOSE:
+    case PN_PROACTOR_INACTIVE:
+      return true;
+
+    case PN_LISTENER_ACCEPT: {
+      listener = pn_event_listener(e);
+      pn_raw_connection_t *rc = pn_raw_connection();
+      pn_listener_raw_accept(listener, rc);
+      last_server_ = rc;
+      return false;
+    } break;
+
+    case PN_RAW_CONNECTION_WAKE: {
+      if (close_on_wake_) {
+        pn_raw_connection_t *rc = pn_event_raw_connection(e);
+        pn_raw_connection_close(rc);
+      }
+      return true;
+    } break;
+
+
+    default:
+      return false;
+    }
+  }
+};
+
+
+} // namespace
+
+// Test waking up a connection that is idle
+TEST_CASE("proactor_raw_connection_wake") {
+  common_handler h;
+  proactor p(&h);
+  pn_listener_t *l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+
+  pn_raw_connection_t *rc = pn_raw_connection();
+  std::string addr = ":" + pn_test::listening_port(l);
+  pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
+
+
+  REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+    pn_raw_connection_wake(rc);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  CHECK(pn_proactor_get(p) == NULL); /* idle */
+
+  h.set_close_on_wake(true);
+  pn_raw_connection_wake(rc);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+  pn_raw_connection_wake(h.last_server());
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE);
+  REQUIRE_RUN(p, PN_RAW_CONNECTION_DISCONNECTED);
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_LISTENER_CLOSE);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 02/03: PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 47b958f271ea0637046c9b85de4690bd4dbebb1d
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Thu Mar 23 17:05:49 2023 -0700

    PROTON-2673: epoll raw connections - allow delivery of wake events prior to successful connect.
---
 c/src/proactor/epoll_raw_connection.c    | 34 +++++++++++++++++---------------
 c/src/proactor/raw_connection-internal.h |  2 ++
 c/src/proactor/raw_connection.c          | 12 ++++++++++-
 3 files changed, 31 insertions(+), 17 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index cb61d6a75..56bebee85 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -367,7 +367,18 @@ static void  set_error(pn_raw_connection_t *conn, const char *msg, int err) {
 
 pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool sched_ready) {
   praw_connection_t *rc = containerof(t, praw_connection_t, task);
+  bool task_wake = false;
+  bool can_wake = pni_raw_can_wake(&rc->raw_connection);
   lock(&rc->task.mutex);
+  t->working = true;
+  if (sched_ready)
+    schedule_done(t);
+  if (pni_task_wake_pending(&rc->task)) {
+    if (can_wake)
+      task_wake = true; // batch_next() will complete the task wake.
+    else
+      pni_task_wake_done(&rc->task);  // Complete task wake without event.
+  }
   int events = io_events;
   int fd = rc->psocket.epoll_io.fd;
   if (!rc->connected) {
@@ -381,26 +392,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
       return &rc->batch;
     }
     if (events & (EPOLLHUP | EPOLLERR)) {
+      // A wake can be the first event.  Otherwise, wait for connection to complete.
+      bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector);
+      t->working = event_pending;
       unlock(&rc->task.mutex);
-      return NULL;
+      return event_pending ? &rc->batch : NULL;
     }
-    praw_connection_connected_lh(rc);
+    if (events & EPOLLOUT)
+      praw_connection_connected_lh(rc);
   }
   unlock(&rc->task.mutex);
 
-  bool wake = false;
-  lock(&t->mutex);
-  t->working = true;
-  if (sched_ready) {
-    schedule_done(t);
-    if (pni_task_wake_pending(&rc->task)) {
-      wake = true;
-      pni_task_wake_done(&rc->task);
-    }
-  }
-  unlock(&t->mutex);
-
-  if (wake) pni_raw_wake(&rc->raw_connection);
   if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
   if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
   rc->batch_empty = false;
@@ -417,7 +419,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
     if (pn_collector_peek(rc->raw_connection.collector))
       have_event = true;
     else {
-      pn_event_t *e = pni_raw_event_next(&rc->raw_connection);
+      pn_event_t *e = pni_raw_batch_next(&rc->batch);
       // State machine up to date.
       if (e) {
         have_event = true;
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index 218bf2b2d..47b0ea925 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -126,6 +126,8 @@ bool pni_raw_validate(pn_raw_connection_t *conn);
 void pni_raw_connected(pn_raw_connection_t *conn);
 void pni_raw_connect_failed(pn_raw_connection_t *conn);
 void pni_raw_wake(pn_raw_connection_t *conn);
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn);
+bool pni_raw_can_wake(pn_raw_connection_t *conn);
 void pni_raw_close(pn_raw_connection_t *conn);
 void pni_raw_read_close(pn_raw_connection_t *conn);
 void pni_raw_write_close(pn_raw_connection_t *conn);
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index a7aa21d11..fd633a284 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -475,7 +475,17 @@ void pni_raw_connect_failed(pn_raw_connection_t *conn) {
 }
 
 void pni_raw_wake(pn_raw_connection_t *conn) {
-  conn->wakepending = true;
+  if (conn->disconnect_state != disc_fini)
+    conn->wakepending = true;
+}
+
+bool pni_raw_wake_is_pending(pn_raw_connection_t *conn) {
+  return conn->wakepending;
+}
+
+bool pni_raw_can_wake(pn_raw_connection_t *conn) {
+  // True if DISCONNECTED event has not yet been extracted from the batch.
+  return (conn->disconnect_state != disc_fini);
 }
 
 void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 03/03: PROTON-2673: clarify doc for pn_raw_connection_wake()

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit f13bb8132d9e744c4bc4ff66d283f9d4e34965f8
Author: Clifford Jansen <cl...@apache.org>
AuthorDate: Thu Mar 23 23:53:12 2023 -0700

    PROTON-2673: clarify doc for pn_raw_connection_wake()
---
 c/include/proton/raw_connection.h | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h
index d19ff6dc5..ef5fd33a7 100644
--- a/c/include/proton/raw_connection.h
+++ b/c/include/proton/raw_connection.h
@@ -244,10 +244,19 @@ PNP_EXTERN bool pn_raw_connection_is_write_closed(pn_raw_connection_t *connectio
  * Return a @ref PN_RAW_CONNECTION_WAKE event for @p connection as soon as possible.
  *
  * At least one wake event will be returned, serialized with other @ref proactor_events
- * for the same raw connection.  Wakes can be "coalesced" - if several
+ * for the same raw connection, except as noted.  Wakes can be "coalesced" - if several
  * @ref pn_raw_connection_wake() calls happen close together, there may be only one
  * @ref PN_RAW_CONNECTION_WAKE event that occurs after all of them.
  *
+ * A @ref PN_RAW_CONNECTION_WAKE event will never follow a
+ * @ref PN_RAW_CONNECTION_DISCONNECTED event. I.e. it will be dropped.
+ *
+ * The result of this call is undefined if called after a @ref PN_RAW_CONNECTION_DISCONNECTED
+ * event has been delivered and its event batch has been released by a call to
+ * @ref pn_proactor_done().  It is also undefined if called before the return of either
+ * @ref pn_proactor_raw_connect() or @ref pn_listener_raw_accept() for client or server
+ * raw connections respectively.
+ *
  * @note Thread-safe
  */
 PNP_EXTERN void pn_raw_connection_wake(pn_raw_connection_t *connection);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org