You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/03/23 16:14:40 UTC

[qpid-dispatch] branch master updated: DISPATCH-2012: Move the setting of the initial delivery's conn_id and link_id to qdr_link_first_attach. This closes #1078.

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 9934dda  DISPATCH-2012: Move the setting of the initial delivery's conn_id and link_id to qdr_link_first_attach. This closes #1078.
9934dda is described below

commit 9934ddac85158cfd1398c42e2a55ee264f3ffa5d
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Fri Mar 19 10:49:19 2021 -0400

    DISPATCH-2012: Move the setting of the initial delivery's conn_id and link_id to qdr_link_first_attach. This closes #1078.
---
 src/adaptors/tcp_adaptor.c    | 11 ++++++++++-
 src/message.c                 |  5 +++++
 src/router_core/connections.c | 14 ++++++++------
 3 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 0b83123..884d0a7 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -169,7 +169,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
     // prevent the tc from being deleted while running:
     sys_mutex_lock(tc->activation_lock);
 
-    if (tc && tc->pn_raw_conn) {
+    if (tc->pn_raw_conn) {
         sys_atomic_set(&tc->q2_restart, 1);
         pn_raw_connection_wake(tc->pn_raw_conn);
     }
@@ -629,13 +629,17 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
         conn->q2_blocked = false;
         handle_incoming_impl(conn, true);
+        sys_mutex_lock(conn->activation_lock);
         conn->raw_closed_read = true;
+        sys_mutex_unlock(conn->activation_lock);
         pn_raw_connection_close(conn->pn_raw_conn);
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+        sys_mutex_lock(conn->activation_lock);
         conn->raw_closed_write = true;
+        sys_mutex_unlock(conn->activation_lock);
         pn_raw_connection_close(conn->pn_raw_conn);
         break;
     }
@@ -661,12 +665,17 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_WAKE: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id);
+        sys_mutex_lock(conn->activation_lock);
         if (sys_atomic_set(&conn->q2_restart, 0)) {
+            sys_mutex_unlock(conn->activation_lock);
             // note: unit tests grep for this log!
             qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", conn->conn_id);
             conn->q2_blocked = false;
             handle_incoming(conn);
         }
+        else {
+            sys_mutex_unlock(conn->activation_lock);
+        }
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
diff --git a/src/message.c b/src/message.c
index 6161275..e9052bb 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2530,6 +2530,10 @@ int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw
     size_t       data_offset  = stream_data->payload.offset;
     size_t       payload_len  = stream_data->payload.length;
 
+    qd_message_pvt_t    *owning_message = stream_data->owning_message;
+
+
+    LOCK(owning_message->content->lock);
     //
     // Skip the buffer offset
     //
@@ -2560,6 +2564,7 @@ int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw
         buffer = DEQ_NEXT(buffer);
         idx++;
     }
+    UNLOCK(owning_message->content->lock);
 
     return idx;
 }
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 210704b..eb7701c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -610,6 +610,14 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->strip_annotations_in  = conn->strip_annotations_in;
     link->strip_annotations_out = conn->strip_annotations_out;
 
+    //
+    // Adjust the delivery's identity
+    //
+    if (initial_delivery) {
+        initial_delivery->conn_id = link->conn->identity;
+        initial_delivery->link_id = link->identity;
+    }
+
     if      (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL))
         link->link_type = QD_LINK_CONTROL;
     else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
@@ -1647,12 +1655,6 @@ static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l
     }
 
     //
-    // Adjust the delivery's identity
-    //
-    dlv->conn_id = link->conn->identity;
-    dlv->link_id = link->identity;
-
-    //
     // Enqueue the delivery onto the new link's undelivered list
     //
     set_safe_ptr_qdr_link_t(link, &dlv->link_sp);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org