You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2023/04/11 15:07:28 UTC

[qpid-proton] branch main updated: PROTON-2708: Introduce new proactor APIs that query batch

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 398dadd66 PROTON-2708: Introduce new proactor APIs that query batch
398dadd66 is described below

commit 398dadd6606d25685cd6c6f35d66c9fa2251a651
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Mon Apr 10 20:43:05 2023 -0400

    PROTON-2708: Introduce new proactor APIs that query batch
    
    These new APIs are useful for querying the proactor event batch for the
    object that is the subject of the batch. The proactor guarantees that
    the subject of the batch is never returned in any other batch until this
    batch is completed.
---
 c/examples/broker.c                   |  7 +++----
 c/examples/raw_echo.c                 | 36 +++++++++++++++++------------------
 c/include/proton/proactor.h           | 30 +++++++++++++++++++++++++++++
 c/include/proton/raw_connection.h     |  9 +++++++++
 c/src/proactor/epoll.c                | 13 +++++++++++++
 c/src/proactor/epoll_raw_connection.c |  5 +++++
 c/src/proactor/libuv.c                | 14 ++++++++++++++
 c/src/proactor/win_iocp.cpp           | 14 ++++++++++++++
 8 files changed, 105 insertions(+), 23 deletions(-)

diff --git a/c/examples/broker.c b/c/examples/broker.c
index b2843d3c6..2989ce393 100644
--- a/c/examples/broker.c
+++ b/c/examples/broker.c
@@ -283,9 +283,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
 
 const int WINDOW=5; /* Very small incoming credit window, to show flow control in action */
 
-static bool handle(broker_t* b, pn_event_t* e) {
-  pn_connection_t *c = pn_event_connection(e);
-
+static bool handle(broker_t* b, pn_event_t* e, pn_connection_t *c) {
   switch (pn_event_type(e)) {
 
    case PN_LISTENER_OPEN: {
@@ -433,9 +431,10 @@ static void* broker_thread(void *void_broker) {
   bool finished = false;
   do {
     pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+    pn_connection_t *connection = pn_event_batch_connection(events);
     pn_event_t *e;
     while ((e = pn_event_batch_next(events))) {
-        if (!handle(b, e)) finished = true;
+        if (!handle(b, e, connection)) finished = true;
     }
     pn_proactor_done(b->proactor, events);
   } while(!finished);
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 187dba7f4..a1f49a0da 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -135,11 +135,10 @@ static void free_buffers(pn_raw_buffer_t buffs[], size_t n) {
 }
 
 /* This function handles events when we are acting as the receiver */
-static void handle_receive(app_data_t *app, pn_event_t* event) {
+static bool handle_receive(app_data_t *app, pn_event_t* event, pn_raw_connection_t *c) {
   switch (pn_event_type(event)) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
       if (cd) {
@@ -163,13 +162,11 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     } break;
 
     case PN_RAW_CONNECTION_WAKE: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       printf("**raw connection %tu woken\n", cd-conn_data);
     } break;
 
     case PN_RAW_CONNECTION_DISCONNECTED: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       if (cd) {
         pthread_mutex_lock(&app->lock);
@@ -185,7 +182,6 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     } break;
 
     case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       pn_raw_buffer_t buffs[READ_BUFFERS];
       size_t n;
       while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
@@ -200,7 +196,6 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     } break;
 
     case PN_RAW_CONNECTION_READ: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffs[READ_BUFFERS];
       size_t n;
@@ -225,17 +220,14 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     } break;
 
     case PN_RAW_CONNECTION_CLOSED_READ: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       if (!pn_raw_connection_is_write_closed(c)) {
         send_message(c, "** Goodbye **");
       }
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE:{
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       pn_raw_connection_close(c);
     } break;
     case PN_RAW_CONNECTION_WRITTEN: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
       pn_raw_buffer_t buffs[READ_BUFFERS];
       size_t n;
       while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
@@ -249,6 +241,7 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     default:
       break;
   }
+  return exit_code == 0;
 }
 
 #define WRITE_BUFFERS 4
@@ -333,12 +326,8 @@ static bool handle(app_data_t* app, pn_event_t* event) {
       return false;
     } break;
 
-    default: {
-      pn_raw_connection_t *c = pn_event_raw_connection(event);
-      if (c) {
-          handle_receive(app, event);
-      }
-    }
+    default:
+      break;
   }
   return exit_code == 0;
 }
@@ -350,11 +339,20 @@ void* run(void *arg) {
   bool again = true;
   do {
     pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    pn_event_t *e;
-    for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
-      again = handle(app, e);
+    pn_raw_connection_t *c = pn_event_batch_raw_connection(events);
+    if (c) {
+      pn_event_t *e;
+      for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
+        again = handle_receive(app, e, c);
+      }
+      pn_proactor_done(app->proactor, events);
+    } else {
+      pn_event_t *e;
+      for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
+        again = handle(app, e);
+      }
+      pn_proactor_done(app->proactor, events);
     }
-    pn_proactor_done(app->proactor, events);
   } while(again);
   return NULL;
 }
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index 1d7ff6c8c..b9401ca2e 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -217,6 +217,36 @@ PNP_EXTERN pn_event_batch_t *pn_proactor_get(pn_proactor_t *proactor);
  */
 PNP_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
 
+/**
+ * Query the batch for the subject of the batch. If it is a proactor then it is
+ * returned. NULL means the subject of the batch is not a proactor. The returned
+ * proactor is valid until pn_proactor_done() is called again on the same
+ * batch.
+ *
+ * @return the proactor that is subject of the batch or NULL if none.
+ */
+PNP_EXTERN pn_proactor_t *pn_event_batch_proactor(pn_event_batch_t *batch);
+
+/**
+ * Query the batch for the subject of the batch. If it is a listener then it is
+ * returned. NULL means the subject of the batch is not a listener. The returned
+ * listener is valid until pn_proactor_done() is called again on the same
+ * batch.
+ *
+ * @return the listener that is subject of the batch or NULL if none.
+ */
+PNP_EXTERN pn_listener_t *pn_event_batch_listener(pn_event_batch_t *batch);
+
+/**
+ * Query the batch for the subject of the batch. If it is a connection then it is
+ * returned. NULL means the subject of the batch is not a connection. The returned
+ * connection is valid until pn_proactor_done() is called again on the same
+ * batch.
+ *
+ * @return the connection that is subject of the batch or NULL if none.
+ */
+PNP_EXTERN pn_connection_t *pn_event_batch_connection(pn_event_batch_t *batch);
+
 /**
  * Call when finished handling a batch of events.
  *
diff --git a/c/include/proton/raw_connection.h b/c/include/proton/raw_connection.h
index ef5fd33a7..b00b36a56 100644
--- a/c/include/proton/raw_connection.h
+++ b/c/include/proton/raw_connection.h
@@ -305,6 +305,15 @@ PNP_EXTERN pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *conne
  */
 PNP_EXTERN pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event);
 
+/**
+ * Query the batch for the subject of the batch. If it is a raw connection then it is
+ * returned. NULL means the subject of the batch is not a raw connection. The returned
+ * raw connection is valid until pn_proactor_done() is called again on the same
+ * batch.
+ *
+ * @return the raw connection that is subject of the batch or NULL if none.
+ */
+PNP_EXTERN pn_raw_connection_t *pn_event_batch_raw_connection(pn_event_batch_t *batch);
 
 /**
  * @}
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 511f36213..7abd884ef 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -650,6 +650,19 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
     containerof(batch, pconnection_t, batch) : NULL;
 }
 
+pn_proactor_t *pn_event_batch_proactor(pn_event_batch_t *batch) {
+  return batch_proactor(batch);
+}
+
+pn_listener_t *pn_event_batch_listener(pn_event_batch_t *batch) {
+  return batch_listener(batch);
+}
+
+pn_connection_t *pn_event_batch_connection(pn_event_batch_t *batch) {
+  pconnection_t *r = batch_pconnection(batch);
+  return r ? r->driver.connection : NULL;
+}
+
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
   pconnection_t *pc = psocket_pconnection(ps);
   if (pc) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index 56bebee85..9b85b15f1 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -341,6 +341,11 @@ praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) {
     containerof(batch, praw_connection_t, batch) : NULL;
 }
 
+pn_raw_connection_t *pn_event_batch_raw_connection(pn_event_batch_t *batch) {
+    praw_connection_t *rc = pni_batch_raw_connection(batch);
+    return rc ? &rc->raw_connection : NULL;
+}
+
 task_t *pni_raw_connection_task(praw_connection_t *rc) {
   return &rc->task;
 }
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 3bc9e77fb..b2663fa33 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -377,6 +377,19 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
     containerof(batch, pconnection_t, batch) : NULL;
 }
 
+pn_proactor_t *pn_event_batch_proactor(pn_event_batch_t *batch) {
+  return batch_proactor(batch);
+}
+
+pn_listener_t *pn_event_batch_listener(pn_event_batch_t *batch) {
+  return batch_listener(batch);
+}
+
+pn_connection_t *pn_event_batch_connection(pn_event_batch_t *batch) {
+  pconnection_t *r = batch_pconnection(batch);
+  return r ? r->driver.connection : NULL;
+}
+
 static inline work_t *batch_work(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (pc) return &pc->work;
@@ -1381,3 +1394,4 @@ void pn_raw_connection_read_close(pn_raw_connection_t *conn) {}
 void pn_raw_connection_write_close(pn_raw_connection_t *conn) {}
 const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
 const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
+pn_raw_connection_t *pn_event_batch_raw_connection(pn_event_batch_t* batch) { return NULL; }
diff --git a/c/src/proactor/win_iocp.cpp b/c/src/proactor/win_iocp.cpp
index 4293d4464..1c245e1e4 100644
--- a/c/src/proactor/win_iocp.cpp
+++ b/c/src/proactor/win_iocp.cpp
@@ -2007,6 +2007,19 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
     containerof(batch, pconnection_t, batch) : NULL;
 }
 
+pn_proactor_t *pn_event_batch_proactor(pn_event_batch_t *batch) {
+  return batch_proactor(batch);
+}
+
+pn_listener_t *pn_event_batch_listener(pn_event_batch_t *batch) {
+  return batch_listener(batch);
+}
+
+pn_connection_t *pn_event_batch_connection(pn_event_batch_t *batch) {
+  pconnection_t *r = batch_pconnection(batch);
+  return r ? r->driver.connection : NULL;
+}
+
 static inline bool pconnection_has_event(pconnection_t *pc) {
   return pn_connection_driver_has_event(&pc->driver);
 }
@@ -3434,3 +3447,4 @@ void pn_raw_connection_read_close(pn_raw_connection_t *conn) {}
 void pn_raw_connection_write_close(pn_raw_connection_t *conn) {}
 const struct pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *connection) { return NULL; }
 const struct pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *connection) { return NULL; }
+pn_raw_connection_t *pn_event_batch_raw_connection(pn_event_batch_t* batch) { return NULL; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org