You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/03/05 22:16:34 UTC
[qpid-proton] branch master updated: PROTON-2340: Make raw
connection echo example multithreaded
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new ddb3442 PROTON-2340: Make raw connection echo example multithreaded
ddb3442 is described below
commit ddb34424096e0845edcf9d9b47e7c1d7cc1d0884
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Mar 4 22:58:38 2021 -0500
PROTON-2340: Make raw connection echo example multithreaded
WIPWIPWIP: This stops the example exiting cleanly - needs investigation
This helps to expose the TSAN problems
---
c/examples/raw_echo.c | 57 ++++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 47 insertions(+), 10 deletions(-)
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 0f6515a..b89c2f8 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -19,6 +19,8 @@
*
*/
+#include "thread.h"
+
#include <proton/condition.h>
#include <proton/raw_connection.h>
#include <proton/listener.h>
@@ -37,6 +39,7 @@ typedef struct app_data_t {
pn_proactor_t *proactor;
pn_listener_t *listener;
+ pthread_mutex_t lock;
int64_t first_idle_time;
int64_t wake_conn_time;
int connects;
@@ -142,7 +145,9 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
if (cd) {
int i = READ_BUFFERS;
printf("**raw connection %tu connected\n", cd-conn_data);
+ pthread_mutex_lock(&app->lock);
app->connects++;
+ pthread_mutex_unlock(&app->lock);
for (; i; --i) {
pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
buff->bytes = (char*) malloc(1024);
@@ -167,7 +172,9 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
pn_raw_connection_t *c = pn_event_raw_connection(event);
conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
if (cd) {
+ pthread_mutex_lock(&app->lock);
app->disconnects++;
+ pthread_mutex_unlock(&app->lock);
printf("**raw connection %tu disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
} else {
printf("**raw connection disconnected: not connected\n");
@@ -266,6 +273,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
int64_t now = pn_proactor_now_64();
if (cd) {
+ pthread_mutex_lock(&app->lock);
app->first_idle_time = 0;
if (app->wake_conn_time < now) {
app->wake_conn_time = now + 5000;
@@ -274,6 +282,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
pn_raw_connection_set_context(c, cd);
pn_listener_raw_accept(listener, c);
+ pthread_mutex_unlock(&app->lock);
} else {
printf("**too many connections, trying again later...\n");
@@ -285,11 +294,15 @@ static bool handle(app_data_t* app, pn_event_t* event) {
} break;
case PN_LISTENER_CLOSE: {
+ pn_listener_t *listener = pn_event_listener(event);
app->listener = NULL; /* Listener is closed */
- check_condition_fatal(event, pn_listener_condition(pn_event_listener(event)), app);
+ printf("**listener closed\n");
+ check_condition_fatal(event, pn_listener_condition(listener), app);
} break;
case PN_PROACTOR_TIMEOUT: {
+ pn_proactor_t *proactor = pn_event_proactor(event);
+ pthread_mutex_lock(&app->lock);
int64_t now = pn_proactor_now_64();
pn_millis_t timeout = 5000;
if (app->connects - app->disconnects == 0) {
@@ -298,8 +311,9 @@ static bool handle(app_data_t* app, pn_event_t* event) {
printf("**idle detected, shutting down in %dms\n", timeout);
app->first_idle_time = now;
} else if (app->first_idle_time + 20000 <= now) {
+ printf("**no activity for %dms: shutting down now\n", timeout);
pn_listener_close(app->listener);
- break;
+ break; // No more timeouts
}
} else if (now >= app->wake_conn_time) {
int i;
@@ -308,10 +322,14 @@ static bool handle(app_data_t* app, pn_event_t* event) {
}
app->wake_conn_time = now + 5000;
}
- pn_proactor_set_timeout(pn_event_proactor(event), timeout);
+ pn_proactor_set_timeout(proactor, timeout);
+ pthread_mutex_unlock(&app->lock);
} break;
- case PN_PROACTOR_INACTIVE: {
+ case PN_PROACTOR_INACTIVE:
+ case PN_PROACTOR_INTERRUPT: {
+ pn_proactor_t *proactor = pn_event_proactor(event);
+ pn_proactor_interrupt(proactor);
return false;
} break;
@@ -325,22 +343,26 @@ static bool handle(app_data_t* app, pn_event_t* event) {
return exit_code == 0;
}
-void run(app_data_t *app) {
+void* run(void *arg) {
+ app_data_t *app = arg;
+
/* Loop and handle events */
+ bool again = true;
do {
pn_event_batch_t *events = pn_proactor_wait(app->proactor);
pn_event_t *e;
- for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
- if (!handle(app, e)) {
- return;
- }
+ for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
+ again = handle(app, e);
}
pn_proactor_done(app->proactor, events);
- } while(true);
+ } while(again);
}
+
int main(int argc, char **argv) {
struct app_data_t app = {0};
+ pthread_mutex_init(&app.lock, NULL);
+
char addr[PN_MAX_ADDR];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
@@ -350,7 +372,22 @@ int main(int argc, char **argv) {
app.listener = pn_listener();
pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
pn_proactor_listen(app.proactor, app.listener, addr, 16);
+
+ size_t thread_count = 3;
+ pthread_t threads[thread_count];
+ for (int n=0; n<thread_count; n++) {
+ int rc = pthread_create(&threads[n], 0, run, (void*)&app);
+ if (rc) {
+ fprintf(stderr, "Failed to create thread\n");
+ exit(-1);
+ }
+ }
run(&app);
+
+ for (int n=0; n<thread_count; n++) {
+ pthread_join(threads[n], 0);
+ }
+
pn_proactor_free(app.proactor);
return exit_code;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org