You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2020/12/09 09:33:42 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1857: locking around activation

This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new d113e80  DISPATCH-1857: locking around activation
d113e80 is described below

commit d113e80369a7b8418cd22eaef539404144d8c09c
Author: Gordon Sim <gs...@redhat.com>
AuthorDate: Wed Dec 9 09:32:56 2020 +0000

    DISPATCH-1857: locking around activation
---
 src/adaptors/tcp_adaptor.c | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index dd837d6..2d7974d 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -47,6 +47,7 @@ struct qdr_tcp_connection_t {
     qdr_link_t           *outgoing;
     uint64_t              outgoing_id;
     pn_raw_connection_t  *pn_raw_conn;
+    sys_mutex_t          *activation_lock;
     qdr_delivery_t       *instream;
     qdr_delivery_t       *outstream;
     bool                  ingress;
@@ -229,7 +230,7 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
     if (tc->outgoing_stream_data) {
         free_qd_message_stream_data_t(tc->outgoing_stream_data);
     }
-
+    sys_mutex_free(tc->activation_lock);
     //proactor will free the socket
     free(tc);
 }
@@ -530,6 +531,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
         qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
+        sys_mutex_lock(conn->activation_lock);
+        conn->pn_raw_conn = 0;
+        sys_mutex_unlock(conn->activation_lock);
         handle_disconnected(conn);
         break;
     }
@@ -586,6 +590,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
 {
     qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
     ZERO(tc);
+    tc->activation_lock = sys_mutex();
     tc->ingress = true;
     tc->context.context = tc;
     tc->context.handler = &handle_connection_event;
@@ -663,6 +668,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
 {
     qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
     ZERO(tc);
+    tc->activation_lock = sys_mutex();
     if (initial_delivery) {
         tc->egress_dispatcher = false;
         tc->initial_delivery  = initial_delivery;
@@ -1043,7 +1049,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, tc->outgoing_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event dlv:%lx", tc->conn_id, tc->outgoing_id, delivery);
         if (tc->egress_dispatcher) {
             qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
             return QD_DELIVERY_MOVED_TO_NEW_LINK;
@@ -1156,10 +1162,13 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
     void *context = qdr_connection_get_context(c);
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
+        sys_mutex_lock(conn->activation_lock);
         if (conn->pn_raw_conn) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
+            sys_mutex_unlock(conn->activation_lock);
         } else if (conn->activate_timer) {
+            sys_mutex_unlock(conn->activation_lock);
             // On egress, the raw connection is only created once the
             // first part of the message encapsulating the
             // client->server half of the stream has been
@@ -1169,7 +1178,8 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", conn->conn_id);
             qd_timer_schedule(conn->activate_timer, 0);
         } else {
-            qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id);
+            sys_mutex_unlock(conn->activation_lock);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id);
         }
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no connection context");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org