You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2021/03/23 16:14:40 UTC
[qpid-dispatch] branch master updated: DISPATCH-2012: Move the
setting of the initial delivery's conn_id and link_id to
qdr_link_first_attach. This closes #1078.
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 9934dda DISPATCH-2012: Move the setting of the initial delivery's conn_id and link_id to qdr_link_first_attach. This closes #1078.
9934dda is described below
commit 9934ddac85158cfd1398c42e2a55ee264f3ffa5d
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Fri Mar 19 10:49:19 2021 -0400
DISPATCH-2012: Move the setting of the initial delivery's conn_id and link_id to qdr_link_first_attach. This closes #1078.
---
src/adaptors/tcp_adaptor.c | 11 ++++++++++-
src/message.c | 5 +++++
src/router_core/connections.c | 14 ++++++++------
3 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 0b83123..884d0a7 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -169,7 +169,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
// prevent the tc from being deleted while running:
sys_mutex_lock(tc->activation_lock);
- if (tc && tc->pn_raw_conn) {
+ if (tc->pn_raw_conn) {
sys_atomic_set(&tc->q2_restart, 1);
pn_raw_connection_wake(tc->pn_raw_conn);
}
@@ -629,13 +629,17 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
conn->q2_blocked = false;
handle_incoming_impl(conn, true);
+ sys_mutex_lock(conn->activation_lock);
conn->raw_closed_read = true;
+ sys_mutex_unlock(conn->activation_lock);
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
case PN_RAW_CONNECTION_CLOSED_WRITE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+ sys_mutex_lock(conn->activation_lock);
conn->raw_closed_write = true;
+ sys_mutex_unlock(conn->activation_lock);
pn_raw_connection_close(conn->pn_raw_conn);
break;
}
@@ -661,12 +665,17 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
}
case PN_RAW_CONNECTION_WAKE: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id);
+ sys_mutex_lock(conn->activation_lock);
if (sys_atomic_set(&conn->q2_restart, 0)) {
+ sys_mutex_unlock(conn->activation_lock);
// note: unit tests grep for this log!
qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", conn->conn_id);
conn->q2_blocked = false;
handle_incoming(conn);
}
+ else {
+ sys_mutex_unlock(conn->activation_lock);
+ }
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
diff --git a/src/message.c b/src/message.c
index 6161275..e9052bb 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2530,6 +2530,10 @@ int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw
size_t data_offset = stream_data->payload.offset;
size_t payload_len = stream_data->payload.length;
+ qd_message_pvt_t *owning_message = stream_data->owning_message;
+
+
+ LOCK(owning_message->content->lock);
//
// Skip the buffer offset
//
@@ -2560,6 +2564,7 @@ int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw
buffer = DEQ_NEXT(buffer);
idx++;
}
+ UNLOCK(owning_message->content->lock);
return idx;
}
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 210704b..eb7701c 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -610,6 +610,14 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
+ //
+ // Adjust the delivery's identity
+ //
+ if (initial_delivery) {
+ initial_delivery->conn_id = link->conn->identity;
+ initial_delivery->link_id = link->identity;
+ }
+
if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL))
link->link_type = QD_LINK_CONTROL;
else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
@@ -1647,12 +1655,6 @@ static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l
}
//
- // Adjust the delivery's identity
- //
- dlv->conn_id = link->conn->identity;
- dlv->link_id = link->identity;
-
- //
// Enqueue the delivery onto the new link's undelivered list
//
set_safe_ptr_qdr_link_t(link, &dlv->link_sp);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org