You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/12/02 17:58:37 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1857 - Handle asynchronous moving of a delivery to another link. Patch contents from Gordon Sim. This improves the stability of multiple, concurrent connection handling. This closes #932.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new ffc4e86  DISPATCH-1857 - Handle asynchronous moving of a delivery to another link. Patch contents from Gordon Sim. This improves the stability of multiple, concurrent connection handling. This closes #932.
ffc4e86 is described below

commit ffc4e8649aac070a70688a99fc19dbb8f17e195e
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Dec 2 12:55:32 2020 -0500

    DISPATCH-1857 - Handle asynchronous moving of a delivery to another link.
    Patch contents from Gordon Sim.
    This improves the stability of multiple, concurrent connection handling.
    This closes #932.
---
 include/qpid/dispatch/protocol_adaptor.h |   1 +
 src/adaptors/tcp_adaptor.c               | 110 ++++++++++++++++++-------------
 src/router_core/router_core.c            |   2 +
 src/router_core/transfer.c               |  11 +++-
 4 files changed, 78 insertions(+), 46 deletions(-)

diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 93b23b4..ae7036f 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -203,6 +203,7 @@ typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
  */
 typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit);
 
+extern const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK;
 /**
  * qdr_link_deliver_t callback
  *
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 9c94c86..98df531 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -53,6 +53,7 @@ struct qdr_tcp_connection_t {
     bool                  flow_enabled;
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if egress_dispatcher=true
+    qdr_delivery_t       *initial_delivery;
     qd_timer_t           *activate_timer;
     qd_bridge_config_t    config;
     qd_server_t          *server;
@@ -93,6 +94,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo
 
 static void handle_disconnected(qdr_tcp_connection_t* conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
+static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
 
 static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn)
 {
@@ -494,12 +496,16 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     case PN_RAW_CONNECTION_CONNECTED: {
         if (conn->ingress) {
             qdr_tcp_connection_ingress_accept(conn);
-            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s", conn->conn_id, conn->remote_address);
+            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s (global_id=%s)", conn->conn_id, conn->remote_address, conn->global_id);
             break;
         } else {
             conn->remote_address = get_address_string(conn->socket);
             conn->opened_time = tcp_adaptor->core->uptime_ticks;
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id);
+            if (!!conn->initial_delivery) {
+                qdr_tcp_open_server_side_connection(conn);
+                conn->initial_delivery = 0;
+            }
             while (qdr_connection_process(conn->conn)) {}
             handle_outgoing(conn);
             break;
@@ -529,6 +535,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", conn->conn_id);
         while (qdr_connection_process(conn->conn)) {}
+        handle_incoming(conn);
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
@@ -563,6 +570,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
         break;
     }
     default:
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Unexpected Event: %d", conn->conn_id, pn_event_type(e));
         break;
     }
 }
@@ -586,35 +594,18 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
     return tc;
 }
 
-static void tcp_connector_establish(qdr_tcp_connection_t *conn)
-{
-    qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", conn->conn_id, conn->config.host_port);
-    conn->socket = pn_raw_connection();
-    pn_raw_connection_set_context(conn->socket, conn);
-    pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port);
-}
 
-static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
 {
-    qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
-    ZERO(tc);
-    if (initial_delivery) {
-        tc->egress_dispatcher = false;
-    } else {
-        tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
-        tc->egress_dispatcher = true;
-    }
-    tc->ingress = false;
-    tc->context.context = tc;
-    tc->context.handler = &handle_connection_event;
-    tc->config = *config;
-    tc->server = server;
+    const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port;
+    qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host);
+
     qdr_connection_info_t *info = qdr_connection_info(false, //bool             is_encrypted,
                                                       false, //bool             is_authenticated,
                                                       true,  //bool             opened,
                                                       "",   //char            *sasl_mechanisms,
                                                       QD_OUTGOING, //qd_direction_t   dir,
-                                                      tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port,    //const char      *host,
+                                                      host,  //const char      *host,
                                                       "",    //const char      *ssl_proto,
                                                       "",    //const char      *ssl_cipher,
                                                       "",    //const char      *user,
@@ -625,7 +616,6 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
                                                       "",                  // peer router version,
                                                       false);              // streaming links
 
-    tc->conn_id = qd_server_allocate_connection_id(tc->server);
     qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
                                                    tcp_adaptor->adaptor,
                                                    false,
@@ -650,20 +640,48 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
     qdr_terminus_set_address(source, tc->config.address);
 
     tc->outgoing = qdr_link_first_attach(conn,
-                          QD_OUTGOING,
-                          source,           //qdr_terminus_t   *source,
-                          qdr_terminus(0),  //qdr_terminus_t   *target,
-                          "tcp.egress.out", //const char       *name,
-                          0,                //const char       *terminus_addr,
-                          !(tc->egress_dispatcher),
-                          initial_delivery,
-                          &(tc->outgoing_id));
+                                         QD_OUTGOING,
+                                         source,           //qdr_terminus_t   *source,
+                                         qdr_terminus(0),  //qdr_terminus_t   *target,
+                                         "tcp.egress.out", //const char       *name,
+                                         0,                //const char       *terminus_addr,
+                                         !(tc->egress_dispatcher),
+                                         tc->initial_delivery,
+                                         &(tc->outgoing_id));
     qdr_link_set_context(tc->outgoing, tc);
-    //the incoming link for egress is created once we receive the
-    //message which has the reply to address (and read buffers are
-    //granted at that point)
-    if (!tc->egress_dispatcher) {
-        tcp_connector_establish(tc);
+}
+
+
+static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+{
+    qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+    ZERO(tc);
+    if (initial_delivery) {
+        tc->egress_dispatcher = false;
+        tc->initial_delivery  = initial_delivery;
+    } else {
+        tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
+        tc->egress_dispatcher = true;
+    }
+    tc->ingress = false;
+    tc->context.context = tc;
+    tc->context.handler = &handle_connection_event;
+    tc->config = *config;
+    tc->server = server;
+    tc->conn_id = qd_server_allocate_connection_id(tc->server);
+
+    //
+    // If this is the egress dispatcher, set up the core connection now.  Otherwise, set up a physical
+    // raw connection and wait until we are running in that connection's context to set up the core
+    // connection.
+    //
+    if (tc->egress_dispatcher)
+        qdr_tcp_open_server_side_connection(tc);
+    else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", tc->conn_id, tc->config.host_port);
+        tc->socket = pn_raw_connection();
+        pn_raw_connection_set_context(tc->socket, tc);
+        pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->socket, tc->config.host_port);
     }
 
     return tc;
@@ -1021,6 +1039,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, tc->outgoing_id);
         if (tc->egress_dispatcher) {
             qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+            return QD_DELIVERY_MOVED_TO_NEW_LINK;
         } else if (!tc->outstream) {
             tc->outstream = delivery;
             qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
@@ -1038,14 +1057,15 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
                 qdr_terminus_t *target = qdr_terminus(0);
                 qdr_terminus_set_address(target, tc->reply_to);
                 tc->incoming = qdr_link_first_attach(tc->conn,
-                                                        QD_INCOMING,
-                                                        qdr_terminus(0),  //qdr_terminus_t   *source,
-                                                        target, //qdr_terminus_t   *target,
-                                                        "tcp.egress.in",  //const char       *name,
-                                                        0,                //const char       *terminus_addr,
-                                                        false,
-                                                        NULL,
-                                                        &(tc->incoming_id));
+                                                     QD_INCOMING,
+                                                     qdr_terminus(0),  //qdr_terminus_t   *source,
+                                                     target, //qdr_terminus_t   *target,
+                                                     "tcp.egress.in",  //const char       *name,
+                                                     0,                //const char       *terminus_addr,
+                                                     false,
+                                                     NULL,
+                                                     &(tc->incoming_id));
+                qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Create Link to %s", tc->conn_id, tc->reply_to);
                 qdr_link_set_context(tc->incoming, tc);
                 //add this connection to those visible through management now that we have the global_id
                 qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection");
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d1f7273..779a1f0 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -39,6 +39,8 @@ ALLOC_DEFINE(qdr_connection_ref_t);
 ALLOC_DEFINE(qdr_connection_info_t);
 ALLOC_DEFINE(qdr_subscription_ref_t);
 
+const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK = 999999999;
+
 static void qdr_general_handler(void *context);
 
 qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 8c361a2..d481634 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -170,6 +170,9 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                         to_new_link = true;
                         break;
                     }
+                    if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) {
+                        break;
+                    }
                 } while (settled != dlv->settled && !to_new_link);  // oops missed the settlement
 
                 send_complete = qdr_delivery_send_complete(dlv);
@@ -209,6 +212,12 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                         }
                     }
                 }
+                else if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) {
+                    DEQ_REMOVE_HEAD(link->undelivered);
+                    dlv->link_work = 0;
+                    dlv->where = QDR_DELIVERY_NOWHERE;
+                    qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - moved from undelivered list to some other link");
+                }
                 else {
                     qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - not send_complete");
 
@@ -232,7 +241,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
                 }
                 sys_mutex_unlock(conn->work_lock);
 
-                if (new_disp) {
+                if (new_disp && new_disp != QD_DELIVERY_MOVED_TO_NEW_LINK) {
                     // the remote sender-settle-mode forced us to pre-settle the
                     // message.  The core needs to know this, so we "fake" receiving a
                     // settle+disposition update from the remote end of the link:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org