You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/03/02 20:49:22 UTC

[qpid-proton] branch master updated: PROTON-2339: Proactor raw connection: Introduce DRAIN_BUFFERS event

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 9207223  PROTON-2339: Proactor raw connection: Introduce DRAIN_BUFFERS event
9207223 is described below

commit 92072237908668d0117f1b6bf5fabe7328b6589b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed Feb 24 13:24:39 2021 -0500

    PROTON-2339: Proactor raw connection: Introduce DRAIN_BUFFERS event
    
    This will supercede the use of READ/WRITTEN events to tell the
    application to retrieve its buffers before DISCONNECT.
    
    This more specific event should be clearer in purpose.
    
    For backwards compatibility with existing code, if the application
    doesn't drain the buffers in response to the drain event, we will
    send the read/written events (as applicable) as well. This behaviour
    should be removed when the existing major application (dispatch) is
    fixed to use this new event.
---
 c/examples/raw_connect.c                 | 21 +++++++++++++++++-
 c/examples/raw_echo.c                    | 31 ++++++++++++++++++--------
 c/include/proton/event.h                 | 13 ++++++++++-
 c/src/core/event.c                       |  1 +
 c/src/proactor/raw_connection-internal.h | 12 ++++++++--
 c/src/proactor/raw_connection.c          | 38 +++++++++++++++++++++++---------
 c/tests/raw_connection_test.cpp          | 31 ++++++++++++++++++++++++--
 7 files changed, 122 insertions(+), 25 deletions(-)

diff --git a/c/examples/raw_connect.c b/c/examples/raw_connect.c
index d380801..758903c 100644
--- a/c/examples/raw_connect.c
+++ b/c/examples/raw_connect.c
@@ -128,6 +128,13 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
 
 #define WRITE_BUFFERS 4
 
+static void free_buffers(pn_raw_buffer_t buffs[], size_t n) {
+  unsigned i;
+  for (i=0; i<n; ++i) {
+    free(buffs[i].bytes);
+  }
+}
+
 /* This function handles events when we are acting as the sender */
 static void handle_send(app_data_t* app, pn_event_t* event) {
   switch (pn_event_type(event)) {
@@ -148,6 +155,18 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
       check_condition(event, pn_raw_connection_condition(c), app);
     } break;
 
+    case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        free_buffers(buffs, n);
+      }
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        free_buffers(buffs, n);
+      }
+    }
+
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
       char line[120];
@@ -161,7 +180,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
       }
     } break;
 
-    /* This path handles both received bytes and freeing buffers at close */
+    /* This path handles received bytes */
     case PN_RAW_CONNECTION_READ: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
       pn_raw_buffer_t buffs[READ_BUFFERS];
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index d47f95b..7c296e9 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -125,6 +125,13 @@ void free_conn_data(conn_data_t *c) {
 
 #define READ_BUFFERS 4
 
+static void free_buffers(pn_raw_buffer_t buffs[], size_t n) {
+  unsigned i;
+  for (i=0; i<n; ++i) {
+    free(buffs[i].bytes);
+  }
+}
+
 /* This function handles events when we are acting as the receiver */
 static void handle_receive(app_data_t *app, pn_event_t* event) {
   switch (pn_event_type(event)) {
@@ -170,10 +177,21 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
       free_conn_data(cd);
     } break;
 
+    case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      pn_raw_buffer_t buffs[READ_BUFFERS];
+      size_t n;
+      while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
+        free_buffers(buffs, n);
+      }
+      while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
+        free_buffers(buffs, n);
+      }
+    }
+
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
     } break;
 
-    /* This path handles both received bytes and freeing buffers at close */
     case PN_RAW_CONNECTION_READ: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
@@ -188,15 +206,13 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
         }
         cd->buffers += n;
 
+        // Echo back if we can
         if (!pn_raw_connection_is_write_closed(c)) {
           pn_raw_connection_write_buffers(c, buffs, n);
         } else if (!pn_raw_connection_is_read_closed(c)) {
           pn_raw_connection_give_read_buffers(c, buffs, n);
         } else {
-          unsigned i;
-          for (i=0; i<n && buffs[i].bytes; ++i) {
-            free(buffs[i].bytes);
-          }
+          free_buffers(buffs, n);
         }
       }
     } break;
@@ -217,10 +233,7 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
         if (!pn_raw_connection_is_read_closed(c)) {
           pn_raw_connection_give_read_buffers(c, buffs, n);
         } else {
-          unsigned i;
-          for (i=0; i<n && buffs[i].bytes; ++i) {
-            free(buffs[i].bytes);
-          }
+          free_buffers(buffs, n);
         }
       };
     } break;
diff --git a/c/include/proton/event.h b/c/include/proton/event.h
index 9562e78..dba1b44 100644
--- a/c/include/proton/event.h
+++ b/c/include/proton/event.h
@@ -459,7 +459,18 @@ typedef enum {
    *
    * Events of this type point to a @ref pn_raw_connection_t
    */
-  PN_RAW_CONNECTION_WAKE
+  PN_RAW_CONNECTION_WAKE,
+
+  /**
+   * The raw connection is returning all the remaining buffers to the application.
+   *
+   * The raw connection is about to disconnect and shutdown. To avoid leaking the buffers
+   * the application must take the buffers back used @ref pn_raw_connection_take_read_buffers
+   * and @ref pn_raw_connection_take_write_buffers.
+   *
+   * Events of this type point to a @ref pn_raw_connection_t
+   */
+  PN_RAW_CONNECTION_DRAIN_BUFFERS
 
 } pn_event_type_t;
 
diff --git a/c/src/core/event.c b/c/src/core/event.c
index cecc615..58bf6c0 100644
--- a/c/src/core/event.c
+++ b/c/src/core/event.c
@@ -361,6 +361,7 @@ const char *pn_event_type_name(pn_event_type_t type)
   CASE(PN_RAW_CONNECTION_READ);
   CASE(PN_RAW_CONNECTION_WRITTEN);
   CASE(PN_RAW_CONNECTION_WAKE);
+  CASE(PN_RAW_CONNECTION_DRAIN_BUFFERS);
   default:
     return "PN_UNKNOWN";
   }
diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h
index beda92d..ea517c6 100644
--- a/c/src/proactor/raw_connection-internal.h
+++ b/c/src/proactor/raw_connection-internal.h
@@ -62,6 +62,14 @@ typedef enum {
   conn_fini  = 10,
 } raw_conn_state;
 
+typedef enum {
+  disc_init        = 0,
+  disc_drain_msg   = 1,
+  disc_read_msg    = 2,
+  disc_written_msg = 3,
+  disc_fini        = 4
+} raw_disconnect_state;
+
 typedef uint16_t buff_ptr; // This is always the index+1 so that 0 can be special
 
 typedef struct pbuffer_t {
@@ -97,6 +105,8 @@ struct pn_raw_connection_t {
   buff_ptr wbuffer_last_written;
 
   uint8_t state; // really raw_conn_state
+  uint8_t disconnect_state; // really raw_disconnect_state
+
   bool rrequestedbuffers;
   bool wrequestedbuffers;
 
@@ -104,8 +114,6 @@ struct pn_raw_connection_t {
   bool wpending;
   bool rclosedpending;
   bool wclosedpending;
-  bool rdrainpending;
-  bool wdrainpending;
   bool disconnectpending;
   bool wakepending;
 };
diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c
index a819b5b..f0e9adf 100644
--- a/c/src/proactor/raw_connection.c
+++ b/c/src/proactor/raw_connection.c
@@ -445,13 +445,12 @@ static inline void pni_raw_release_buffers(pn_raw_connection_t *conn) {
     conn->wbuffers[p-1].type = buff_written;
   }
   conn->wbuffer_last_towrite = 0;
-  conn->rdrainpending = (bool)(conn->rbuffer_first_read);
-  conn->wdrainpending = (bool)(conn->wbuffer_first_written);
 }
 
 static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {
   pni_raw_release_buffers(conn);
   conn->disconnectpending = true;
+  conn->disconnect_state  = disc_init;
   conn->state = pni_raw_new_state(conn, int_disconnect);
 }
 
@@ -672,15 +671,34 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
     } else if (conn->wclosedpending) {
       pni_raw_put_event(conn, PN_RAW_CONNECTION_CLOSED_WRITE);
       conn->wclosedpending = false;
-    } else if (conn->rdrainpending) {
-      pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
-      conn->rdrainpending = false;
-    } else if (conn->wdrainpending) {
-      pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
-      conn->wdrainpending = false;
     } else if (conn->disconnectpending) {
-      pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
-      conn->disconnectpending = false;
+      switch (conn->disconnect_state) {
+      case disc_init:
+        if (conn->rbuffer_first_read || conn->wbuffer_first_written) {
+          pni_raw_put_event(conn, PN_RAW_CONNECTION_DRAIN_BUFFERS);
+        }
+        conn->disconnect_state = disc_drain_msg;
+        break;
+      // TODO: We'll leave the read/written events in here for the moment for backward compatibility
+      // remove them soon (after dispatch uses DRAIN_BUFFER)
+      case disc_drain_msg:
+        if (conn->rbuffer_first_read) {
+          pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
+        }
+        conn->disconnect_state = disc_read_msg;
+        break;
+      case disc_read_msg:
+        if (conn->wbuffer_first_written) {
+          pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
+        }
+        conn->disconnect_state = disc_written_msg;
+        break;
+      case disc_written_msg:
+        pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
+        conn->disconnectpending = false;
+        conn->disconnect_state = disc_fini;
+        break;
+      }
     } else if (!pni_raw_wdrained(conn) && !conn->wbuffer_first_towrite && !conn->wrequestedbuffers) {
       // Ran out of write buffers
       pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
index b7ce803..27de254 100644
--- a/c/tests/raw_connection_test.cpp
+++ b/c/tests/raw_connection_test.cpp
@@ -569,6 +569,7 @@ TEST_CASE("raw connection") {
 
       REQUIRE_FALSE(pni_raw_can_read(p));
       REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_WRITE);
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DRAIN_BUFFERS);
       REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
       rgiven = pn_raw_connection_take_read_buffers(p, &read[0], rtaken);
       REQUIRE(pni_raw_validate(p));
@@ -633,9 +634,35 @@ TEST_CASE("raw connection") {
         REQUIRE(pni_raw_validate(p));
         CHECK(pn_raw_connection_is_write_closed(p));
         REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_WRITE);
-        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
-        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        // TODO: Remove  the inapplicable tests when the drain buffers completely replaces read/written
+        SECTION("Ensure get read/written events before disconnect if not drained") {
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DRAIN_BUFFERS);
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        }
+        SECTION("Ensure no read/written events before disconnect if drained") {
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DRAIN_BUFFERS);
+          while(pn_raw_connection_take_read_buffers(p, &read[0], read.size())>0);
+          while(pn_raw_connection_take_written_buffers(p, &written[0], written.size())>0);
+        }
+        SECTION("Ensure no written events before disconnect if write drained") {
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DRAIN_BUFFERS);
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+          while(pn_raw_connection_take_read_buffers(p, &read[0], read.size())>0);
+          while(pn_raw_connection_take_written_buffers(p, &written[0], written.size())>0);
+        }
+        SECTION("Ensure no read events before disconnect if read drained") {
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DRAIN_BUFFERS);
+          while(pn_raw_connection_take_read_buffers(p, &read[0], read.size())>0);
+          REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+          while(pn_raw_connection_take_written_buffers(p, &written[0], written.size())>0);
+        }
+        SECTION("Ensure no events before disconnect if already drained") {
+          while(pn_raw_connection_take_read_buffers(p, &read[0], read.size())>0);
+          while(pn_raw_connection_take_written_buffers(p, &written[0], written.size())>0);
+        }
         REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DISCONNECTED);
+
       }
 
       SECTION("Read/Write interleaved") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org