You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:15 UTC
[qpid-proton] 05/05: PROTON-2247: Work on raw echo to improve
output and add some wakes
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 728a2ef3522b6214e4a4a4dcd0bab2fdc60d0135
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jun 9 23:22:26 2020 -0400
PROTON-2247: Work on raw echo to improve output and add some wakes
---
c/examples/raw_echo.c | 117 ++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 99 insertions(+), 18 deletions(-)
diff --git a/c/examples/raw_echo.c b/c/examples/raw_echo.c
index 53fd47b..53a4ada 100644
--- a/c/examples/raw_echo.c
+++ b/c/examples/raw_echo.c
@@ -37,6 +37,9 @@ typedef struct app_data_t {
pn_proactor_t *proactor;
pn_listener_t *listener;
+ int64_t first_idle_time;
+ int64_t try_accept_time;
+ int64_t wake_conn_time;
int connects;
int disconnects;
@@ -45,6 +48,17 @@ typedef struct app_data_t {
/* Receiver values */
} app_data_t;
+#define MAX_CONNECTIONS 5
+
+typedef struct conn_data_t {
+ pn_raw_connection_t *connection;
+ int64_t last_recv_time;
+ int bytes;
+ int buffers;
+} conn_data_t;
+
+static conn_data_t conn_data[MAX_CONNECTIONS] = {{0}};
+
static int exit_code = 0;
/* Close the connection and the listener so so we will get a
@@ -77,10 +91,22 @@ static void recv_message(pn_raw_buffer_t buf) {
fwrite(buf.bytes, buf.size, 1, stdout);
}
-void *make_receiver_data(void) {
+conn_data_t *make_conn_data(pn_raw_connection_t *c) {
+ int i;
+ for (i = 0; i < MAX_CONNECTIONS; ++i) {
+ if (!conn_data[i].connection) {
+ conn_data[i].connection = c;
+ return &conn_data[i];
+ }
+ }
return NULL;
}
+void free_conn_data(conn_data_t *c) {
+ if (!c) return;
+ c->connection = NULL;
+}
+
#define READ_BUFFERS 4
/* This function handles events when we are acting as the receiver */
@@ -89,27 +115,42 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
case PN_RAW_CONNECTION_CONNECTED: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
+ conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
- int i = READ_BUFFERS;
- printf("**raw connection connected\n");
- app->connects++;
- for (; i; --i) {
- pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
- buff->bytes = (char*) malloc(1024);
- buff->capacity = 1024;
- buff->size = 0;
- buff->offset = 0;
+ if (cd) {
+ int i = READ_BUFFERS;
+ printf("**raw connection %ld connected\n", cd-conn_data);
+ app->connects++;
+ for (; i; --i) {
+ pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
+ buff->bytes = (char*) malloc(1024);
+ buff->capacity = 1024;
+ buff->size = 0;
+ buff->offset = 0;
+ }
+ pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+ } else {
+ printf("**raw connection connected: not connected\n");
}
- pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
+ } break;
+
+ case PN_RAW_CONNECTION_WAKE: {
+ pn_raw_connection_t *c = pn_event_raw_connection(event);
+ conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+ printf("**raw connection %ld woken\n", cd-conn_data);
} break;
case PN_RAW_CONNECTION_DISCONNECTED: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
- void *cd = pn_raw_connection_get_context(c);
- free(cd);
- printf("**raw connection disconnected\n");
+ conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
+ if (cd) {
+ printf("**raw connection %ld disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
+ } else {
+ printf("**raw connection disconnected: not connected\n");
+ }
app->disconnects++;
check_condition(event, pn_raw_connection_condition(c), app);
+ free_conn_data(cd);
} break;
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
@@ -118,13 +159,17 @@ static void handle_receive(app_data_t *app, pn_event_t* event) {
/* This path handles both received bytes and freeing buffers at close */
case PN_RAW_CONNECTION_READ: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
+ conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
pn_raw_buffer_t buffs[READ_BUFFERS];
size_t n;
+ cd->last_recv_time = pn_proactor_now_64();
while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
unsigned i;
for (i=0; i<n && buffs[i].bytes; ++i) {
+ cd->bytes += buffs[i].size;
recv_message(buffs[i]);
}
+ cd->buffers += n;
if (!pn_raw_connection_is_write_closed(c)) {
pn_raw_connection_write_buffers(c, buffs, n);
@@ -179,12 +224,29 @@ static bool handle(app_data_t* app, pn_event_t* event) {
break;
}
case PN_LISTENER_ACCEPT: {
+ pn_listener_t *listener = pn_event_listener(event);
pn_raw_connection_t *c = pn_raw_connection();
- void *cd = make_receiver_data();
- pn_raw_connection_set_context(c, cd);
- pn_listener_raw_accept(pn_event_listener(event), c);
+ void *cd = make_conn_data(c);
+ int64_t now = pn_proactor_now_64();
+
+ if (cd) {
+ app->first_idle_time = 0;
+ app->try_accept_time = 0;
+ if (app->wake_conn_time < now) {
+ app->wake_conn_time = now + 5000;
+ pn_proactor_set_timeout(pn_listener_proactor(listener), 5000);
+ }
+ pn_raw_connection_set_context(c, cd);
+
+ pn_listener_raw_accept(listener, c);
+ } else {
+ printf("**too many connections, trying again later...\n");
+
+ /* No other way to reject connection */
+ pn_listener_raw_accept(listener, c);
+ pn_raw_connection_close(c);
+ }
- if (app->connects>2) pn_listener_close(app->listener);
} break;
case PN_LISTENER_CLOSE: {
@@ -193,6 +255,25 @@ static bool handle(app_data_t* app, pn_event_t* event) {
} break;
case PN_PROACTOR_TIMEOUT: {
+ int64_t now = pn_proactor_now_64();
+ pn_millis_t timeout = 5000;
+ if (app->connects - app->disconnects == 0) {
+ timeout = 20000;
+ if (app->first_idle_time == 0) {
+ printf("**idle detected, shutting down in %dms\n", timeout);
+ app->first_idle_time = now;
+ } else if (app->first_idle_time + 20000 <= now) {
+ pn_listener_close(app->listener);
+ break;
+ }
+ } else if (now >= app->wake_conn_time) {
+ int i;
+ for (i = 0; i < MAX_CONNECTIONS; ++i) {
+ if (conn_data[i].connection) pn_raw_connection_wake(conn_data[i].connection);
+ }
+ app->wake_conn_time = now + 5000;
+ }
+ pn_proactor_set_timeout(pn_event_proactor(event), timeout);
} break;
case PN_PROACTOR_INACTIVE: {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org