You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/03/05 22:16:34 UTC

[qpid-proton] branch master updated: PROTON-2340: Make raw connection echo example multithreaded

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new ddb3442  PROTON-2340: Make raw connection echo example multithreaded
ddb3442 is described below

commit ddb34424096e0845edcf9d9b47e7c1d7cc1d0884
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Mar 4 22:58:38 2021 -0500

    PROTON-2340: Make raw connection echo example multithreaded
    
    WIPWIPWIP: This stops the example exiting cleanly - needs investigation
    This helps to expose the TSAN problems
---
 c/examples/raw_echo.c | 57 ++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 47 insertions(+), 10 deletions(-)

diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 0f6515a..b89c2f8 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -19,6 +19,8 @@
  *
  */
 
+#include "thread.h"
+
 #include <proton/condition.h>
 #include <proton/raw_connection.h>
 #include <proton/listener.h>
@@ -37,6 +39,7 @@ typedef struct app_data_t {
   pn_proactor_t *proactor;
   pn_listener_t *listener;
 
+  pthread_mutex_t lock;
   int64_t first_idle_time;
   int64_t wake_conn_time;
   int connects;
@@ -142,7 +145,9 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
       if (cd) {
         int i = READ_BUFFERS;
         printf("**raw connection %tu connected\n", cd-conn_data);
+        pthread_mutex_lock(&app->lock);
         app->connects++;
+        pthread_mutex_unlock(&app->lock);
         for (; i; --i) {
           pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
           buff->bytes = (char*) malloc(1024);
@@ -167,7 +172,9 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
       pn_raw_connection_t *c = pn_event_raw_connection(event);
       conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
       if (cd) {
+        pthread_mutex_lock(&app->lock);
         app->disconnects++;
+        pthread_mutex_unlock(&app->lock);
         printf("**raw connection %tu disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
       } else {
         printf("**raw connection disconnected: not connected\n");
@@ -266,6 +273,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
       int64_t now = pn_proactor_now_64();
 
       if (cd) {
+        pthread_mutex_lock(&app->lock);
         app->first_idle_time = 0;
         if (app->wake_conn_time < now) {
           app->wake_conn_time = now + 5000;
@@ -274,6 +282,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
         pn_raw_connection_set_context(c, cd);
 
         pn_listener_raw_accept(listener, c);
+        pthread_mutex_unlock(&app->lock);
       } else {
         printf("**too many connections, trying again later...\n");
 
@@ -285,11 +294,15 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     } break;
 
     case PN_LISTENER_CLOSE: {
+      pn_listener_t *listener = pn_event_listener(event);
       app->listener = NULL;        /* Listener is closed */
-      check_condition_fatal(event, pn_listener_condition(pn_event_listener(event)), app);
+      printf("**listener closed\n");
+      check_condition_fatal(event, pn_listener_condition(listener), app);
     } break;
 
     case PN_PROACTOR_TIMEOUT: {
+      pn_proactor_t *proactor = pn_event_proactor(event);
+      pthread_mutex_lock(&app->lock);
       int64_t now = pn_proactor_now_64();
       pn_millis_t timeout = 5000;
       if (app->connects - app->disconnects == 0) {
@@ -298,8 +311,9 @@ static bool handle(app_data_t* app, pn_event_t* event) {
           printf("**idle detected, shutting down in %dms\n", timeout);
           app->first_idle_time = now;
         } else if (app->first_idle_time + 20000 <= now) {
+          printf("**no activity for %dms: shutting down now\n", timeout);
           pn_listener_close(app->listener);
-          break;
+          break; // No more timeouts
         }
       } else if (now >= app->wake_conn_time) {
         int i;
@@ -308,10 +322,14 @@ static bool handle(app_data_t* app, pn_event_t* event) {
         }
         app->wake_conn_time = now + 5000;
       }
-      pn_proactor_set_timeout(pn_event_proactor(event), timeout);
+      pn_proactor_set_timeout(proactor, timeout);
+      pthread_mutex_unlock(&app->lock);
     }  break;
 
-    case PN_PROACTOR_INACTIVE: {
+    case PN_PROACTOR_INACTIVE:
+    case PN_PROACTOR_INTERRUPT: {
+      pn_proactor_t *proactor = pn_event_proactor(event);
+      pn_proactor_interrupt(proactor);
       return false;
     } break;
 
@@ -325,22 +343,26 @@ static bool handle(app_data_t* app, pn_event_t* event) {
   return exit_code == 0;
 }
 
-void run(app_data_t *app) {
+void* run(void *arg) {
+  app_data_t *app = arg;
+
   /* Loop and handle events */
+  bool again = true;
   do {
     pn_event_batch_t *events = pn_proactor_wait(app->proactor);
     pn_event_t *e;
-    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
+    for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
+      again = handle(app, e);
     }
     pn_proactor_done(app->proactor, events);
-  } while(true);
+  } while(again);
 }
 
+
 int main(int argc, char **argv) {
   struct app_data_t app = {0};
+  pthread_mutex_init(&app.lock, NULL);
+
   char addr[PN_MAX_ADDR];
   app.host = (argc > 1) ? argv[1] : "";
   app.port = (argc > 2) ? argv[2] : "amqp";
@@ -350,7 +372,22 @@ int main(int argc, char **argv) {
   app.listener = pn_listener();
   pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
   pn_proactor_listen(app.proactor, app.listener, addr, 16);
+
+  size_t thread_count = 3; 
+  pthread_t threads[thread_count];
+  for (int n=0; n<thread_count; n++) {
+    int rc = pthread_create(&threads[n], 0, run, (void*)&app);
+    if (rc) {
+      fprintf(stderr, "Failed to create thread\n");
+      exit(-1);
+    }
+  }
   run(&app);
+
+  for (int n=0; n<thread_count; n++) {
+    pthread_join(threads[n], 0);
+  }
+
   pn_proactor_free(app.proactor);
   return exit_code;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org