You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/12/09 15:06:14 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1876 - Added proper ref-count protection for the initial-delivery.
Fixed leak of buffers when connections close very early. Enabled the
connect-disconnect test.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new 2ca0f75 DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test.
2ca0f75 is described below
commit 2ca0f75b19263ee7cb335cfce01a930c01dfb9ba
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Dec 9 10:01:09 2020 -0500
DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test.
---
src/adaptors/tcp_adaptor.c | 26 +++++++++++++++++++++-----
src/router_core/connections.c | 4 +++-
tests/system_tests_tcp_adaptor.py | 1 -
3 files changed, 24 insertions(+), 7 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 2d7974d..7940705 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -123,6 +123,7 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
// Give proactor more read buffers for the socket
if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %i read buffers", conn->conn_id, desired);
while (desired) {
size_t i;
for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
@@ -144,7 +145,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
//
// Don't initiate an ingress stream message if we don't yet have a reply-to address and credit.
//
- if (!conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) {
+ if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && !conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) {
if (conn->ingress && !conn->reply_to) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id);
}
@@ -159,15 +160,23 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
size_t n;
int count = 0;
+ int free_count = 0;
while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
qd_buffer_insert(buf, raw_buffers[i].size);
count += raw_buffers[i].size;
- DEQ_INSERT_TAIL(buffers, buf);
+ if (raw_buffers[i].size > 0) {
+ DEQ_INSERT_TAIL(buffers, buf);
+ } else {
+ qd_buffer_free(buf);
+ free_count++;
+ }
}
}
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %i read buffers", conn->conn_id, DEQ_SIZE(buffers));
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count);
grant_read_buffers(conn);
if (conn->instream) {
@@ -242,11 +251,11 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id);
qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
- qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected");
+ qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
}
if (conn->outstream) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id);
- qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected");
+ qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
}
if (conn->incoming) {
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id);
@@ -262,6 +271,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
}
if (conn->initial_delivery) {
qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, 0, false);
+ qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
+ conn->initial_delivery = 0;
}
//need to free on core thread to avoid deleting while in use by management agent
@@ -512,7 +523,6 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address);
if (!!conn->initial_delivery) {
qdr_tcp_open_server_side_connection(conn);
- conn->initial_delivery = 0;
}
while (qdr_connection_process(conn->qdr_conn)) {}
handle_outgoing(conn);
@@ -660,6 +670,10 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
!(tc->egress_dispatcher),
tc->initial_delivery,
&(tc->outgoing_id));
+ if (!!tc->initial_delivery) {
+ qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link");
+ tc->initial_delivery = 0;
+ }
qdr_link_set_context(tc->outgoing, tc);
}
@@ -672,6 +686,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
if (initial_delivery) {
tc->egress_dispatcher = false;
tc->initial_delivery = initial_delivery;
+ qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery");
} else {
tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
tc->egress_dispatcher = true;
@@ -1051,6 +1066,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event dlv:%lx", tc->conn_id, tc->outgoing_id, delivery);
if (tc->egress_dispatcher) {
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] tcp_adaptor initiating egress connection %p", tc->conn_id, tc->outgoing_id, delivery);
qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
return QD_DELIVERY_MOVED_TO_NEW_LINK;
} else if (!tc->outstream) {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 5021b92..8bbc88d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -630,6 +630,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
action->args.connection.source = source;
action->args.connection.target = target;
action->args.connection.initial_delivery = initial_delivery;
+ if (!!initial_delivery)
+ qdr_delivery_incref(initial_delivery, "qdr_link_first_attach - protect delivery in action list");
qdr_action_enqueue(conn->core, action);
return link;
@@ -1607,7 +1609,7 @@ static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn
static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv)
{
qdr_link_t *old_link = safe_deref_qdr_link_t(dlv->link_sp);
- int ref_delta = 0;
+ int ref_delta = -1; // Account for the action-list protection
//
// Remove the delivery from its current link if needed
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index 4c4131f..e73ec9a 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -674,7 +674,6 @@ class TcpAdaptor(TestCase):
@SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
def test_20_tcp_connect_disconnect(self):
- self.skipTest("DISPATCH-1876 reproducer: disabled until DISPATCH-1876 is fixed")
name = "test_20_tcp_connect_disconnect"
self.logger.log("TCP_TEST Start %s" % name)
pairs = [self.EchoPair(self.INTA, self.INTA, sizes=[0])]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org