You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/12/09 15:06:14 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new 2ca0f75  DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test.
2ca0f75 is described below

commit 2ca0f75b19263ee7cb335cfce01a930c01dfb9ba
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Dec 9 10:01:09 2020 -0500

    DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test.
---
 src/adaptors/tcp_adaptor.c        | 26 +++++++++++++++++++++-----
 src/router_core/connections.c     |  4 +++-
 tests/system_tests_tcp_adaptor.py |  1 -
 3 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 2d7974d..7940705 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -123,6 +123,7 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
     // Give proactor more read buffers for the socket
     if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
         size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %i read buffers", conn->conn_id, desired);
         while (desired) {
             size_t i;
             for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
@@ -144,7 +145,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     //
     // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit.
     //
-    if (!conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) {
+    if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && !conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) {
         if (conn->ingress && !conn->reply_to) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id);
         }
@@ -159,15 +160,23 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     size_t n;
     int count = 0;
+    int free_count = 0;
     while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) {
         for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) {
             qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
             qd_buffer_insert(buf, raw_buffers[i].size);
             count += raw_buffers[i].size;
-            DEQ_INSERT_TAIL(buffers, buf);
+            if (raw_buffers[i].size > 0) {
+                DEQ_INSERT_TAIL(buffers, buf);
+            } else {
+                qd_buffer_free(buf);
+                free_count++;
+            }
         }
     }
 
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %i read buffers", conn->conn_id, DEQ_SIZE(buffers));
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count);
     grant_read_buffers(conn);
 
     if (conn->instream) {
@@ -242,11 +251,11 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id);
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
-        qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected");
+        qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream");
     }
     if (conn->outstream) {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id);
-        qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected");
+        qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream");
     }
     if (conn->incoming) {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id);
@@ -262,6 +271,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
     }
     if (conn->initial_delivery) {
         qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, 0, false);
+        qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery");
+        conn->initial_delivery = 0;
     }
 
     //need to free on core thread to avoid deleting while in use by management agent
@@ -512,7 +523,6 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address);
             if (!!conn->initial_delivery) {
                 qdr_tcp_open_server_side_connection(conn);
-                conn->initial_delivery = 0;
             }
             while (qdr_connection_process(conn->qdr_conn)) {}
             handle_outgoing(conn);
@@ -660,6 +670,10 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
                                          !(tc->egress_dispatcher),
                                          tc->initial_delivery,
                                          &(tc->outgoing_id));
+    if (!!tc->initial_delivery) {
+        qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link");
+        tc->initial_delivery = 0;
+    }
     qdr_link_set_context(tc->outgoing, tc);
 }
 
@@ -672,6 +686,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
     if (initial_delivery) {
         tc->egress_dispatcher = false;
         tc->initial_delivery  = initial_delivery;
+        qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery");
     } else {
         tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
         tc->egress_dispatcher = true;
@@ -1051,6 +1066,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event dlv:%lx", tc->conn_id, tc->outgoing_id, delivery);
         if (tc->egress_dispatcher) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] tcp_adaptor initiating egress connection %p", tc->conn_id, tc->outgoing_id, delivery);
             qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
             return QD_DELIVERY_MOVED_TO_NEW_LINK;
         } else if (!tc->outstream) {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 5021b92..8bbc88d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -630,6 +630,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     action->args.connection.source = source;
     action->args.connection.target = target;
     action->args.connection.initial_delivery = initial_delivery;
+    if (!!initial_delivery)
+        qdr_delivery_incref(initial_delivery, "qdr_link_first_attach - protect delivery in action list");
     qdr_action_enqueue(conn->core, action);
 
     return link;
@@ -1607,7 +1609,7 @@ static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn
 static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv)
 {
     qdr_link_t *old_link  = safe_deref_qdr_link_t(dlv->link_sp);
-    int         ref_delta = 0;
+    int         ref_delta = -1; // Account for the action-list protection
 
     //
     // Remove the delivery from its current link if needed
diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py
index 4c4131f..e73ec9a 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -674,7 +674,6 @@ class TcpAdaptor(TestCase):
 
     @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
     def test_20_tcp_connect_disconnect(self):
-        self.skipTest("DISPATCH-1876 reproducer: disabled until DISPATCH-1876 is fixed")
         name = "test_20_tcp_connect_disconnect"
         self.logger.log("TCP_TEST Start %s" % name)
         pairs = [self.EchoPair(self.INTA, self.INTA, sizes=[0])]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org