You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:15 UTC

[qpid-proton] 05/05: PROTON-2247: Work on raw echo to improve output and add some wakes

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 728a2ef3522b6214e4a4a4dcd0bab2fdc60d0135
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jun 9 23:22:26 2020 -0400

    PROTON-2247: Work on raw echo to improve output and add some wakes
---
 c/examples/raw_echo.c | 117 ++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 99 insertions(+), 18 deletions(-)

diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 53fd47b..53a4ada 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -37,6 +37,9 @@ typedef struct app_data_t {
   pn_proactor_t *proactor;
   pn_listener_t *listener;
 
+  int64_t first_idle_time;
+  int64_t try_accept_time;
+  int64_t wake_conn_time;
   int connects;
   int disconnects;
 
@@ -45,6 +48,17 @@ typedef struct app_data_t {
   /* Receiver values */
 } app_data_t;
 
+#define MAX_CONNECTIONS 5
+
+typedef struct conn_data_t {
+  pn_raw_connection_t *connection;
+  int64_t last_recv_time;
+  int bytes;
+  int buffers;
+} conn_data_t;
+
+static conn_data_t conn_data[MAX_CONNECTIONS] = {{0}};
+
 static int exit_code = 0;
 
 /* Close the connection and the listener so so we will get a
@@ -77,10 +91,22 @@ static void recv_message(pn_raw_buffer_t buf) {
   fwrite(buf.bytes, buf.size, 1, stdout);
 }
 
-void *make_receiver_data(void) {
+conn_data_t *make_conn_data(pn_raw_connection_t *c) {
+  int i;
+  for (i = 0; i < MAX_CONNECTIONS; ++i) {
+    if (!conn_data[i].connection) {
+      conn_data[i].connection = c;
+      return &conn_data[i];
+    }
+  }
   return NULL;
 }
 
+void free_conn_data(conn_data_t *c) {
+  if (!c) return;
+  c->connection = NULL;
+}
+
 #define READ_BUFFERS 4
 
 /* This function handles events when we are acting as the receiver */
@@ -89,27 +115,42 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
-      int i = READ_BUFFERS;
-      printf("**raw connection connected\n");
-      app->connects++;
-      for (; i; --i) {
-        pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
-        buff->bytes = (char*) malloc(1024);
-        buff->capacity = 1024;
-        buff->size = 0;
-        buff->offset = 0;
+      if (cd) {
+        int i = READ_BUFFERS;
+        printf("**raw connection %ld connected\n", cd-conn_data);
+        app->connects++;
+        for (; i; --i) {
+          pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+          buff->bytes = (char*) malloc(1024);
+          buff->capacity = 1024;
+          buff->size = 0;
+          buff->offset = 0;
+        }
+        pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+      } else {
+        printf("**raw connection connected: not connected\n");
       }
-      pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+    } break;
+
+    case PN_RAW_CONNECTION_WAKE: {
+      pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+      printf("**raw connection %ld woken\n", cd-conn_data);
     } break;
 
     case PN_RAW_CONNECTION_DISCONNECTED: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
-      void *cd = pn_raw_connection_get_context(c);
-      free(cd);
-      printf("**raw connection disconnected\n");
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+      if (cd) {
+        printf("**raw connection %ld disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
+      } else {
+        printf("**raw connection disconnected: not connected\n");
+      }
       app->disconnects++;
       check_condition(event, pn_raw_connection_condition(c), app);
+      free_conn_data(cd);
     } break;
 
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
@@ -118,13 +159,17 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
     /* This path handles both received bytes and freeing buffers at close */
     case PN_RAW_CONNECTION_READ: {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
+      conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       pn_raw_buffer_t buffs[READ_BUFFERS];
       size_t n;
+      cd->last_recv_time = pn_proactor_now_64();
       while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
         unsigned i;
         for (i=0; i<n && buffs[i].bytes; ++i) {
+          cd->bytes += buffs[i].size;
           recv_message(buffs[i]);
         }
+        cd->buffers += n;
 
         if (!pn_raw_connection_is_write_closed(c)) {
           pn_raw_connection_write_buffers(c, buffs, n);
@@ -179,12 +224,29 @@ static bool handle(app_data_t* app, pn_event_t* event) {
       break;
     }
     case PN_LISTENER_ACCEPT: {
+      pn_listener_t *listener = pn_event_listener(event);
       pn_raw_connection_t *c = pn_raw_connection();
-      void *cd = make_receiver_data();
-      pn_raw_connection_set_context(c, cd);
-      pn_listener_raw_accept(pn_event_listener(event), c);
+      void *cd = make_conn_data(c);
+      int64_t now = pn_proactor_now_64();
+
+      if (cd) {
+        app->first_idle_time = 0;
+        app->try_accept_time = 0;
+        if (app->wake_conn_time < now) {
+          app->wake_conn_time = now + 5000;
+          pn_proactor_set_timeout(pn_listener_proactor(listener), 5000);
+        }
+        pn_raw_connection_set_context(c, cd);
+
+        pn_listener_raw_accept(listener, c);
+      } else {
+        printf("**too many connections, trying again later...\n");
+
+        /* No other way to reject connection */
+        pn_listener_raw_accept(listener, c);
+        pn_raw_connection_close(c);
+      }
 
-      if (app->connects>2) pn_listener_close(app->listener);
     } break;
 
     case PN_LISTENER_CLOSE: {
@@ -193,6 +255,25 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     } break;
 
     case PN_PROACTOR_TIMEOUT: {
+      int64_t now = pn_proactor_now_64();
+      pn_millis_t timeout = 5000;
+      if (app->connects - app->disconnects == 0) {
+        timeout = 20000;
+        if (app->first_idle_time == 0) {
+          printf("**idle detected, shutting down in %dms\n", timeout);
+          app->first_idle_time = now;
+        } else if (app->first_idle_time + 20000 <= now) {
+          pn_listener_close(app->listener);
+          break;
+        }
+      } else if (now >= app->wake_conn_time) {
+        int i;
+        for (i = 0; i < MAX_CONNECTIONS; ++i) {
+          if (conn_data[i].connection) pn_raw_connection_wake(conn_data[i].connection);
+        }
+        app->wake_conn_time = now + 5000;
+      }
+      pn_proactor_set_timeout(pn_event_proactor(event), timeout);
     }  break;
 
     case PN_PROACTOR_INACTIVE: {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org