You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/12/02 17:58:37 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1857 - Handle asynchronous moving of a delivery to another link.
Patch contents from Gordon Sim. This improves the stability of multiple,
concurrent connection handling. This closes #932.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new ffc4e86 DISPATCH-1857 - Handle asynchronous moving of a delivery to another link. Patch contents from Gordon Sim. This improves the stability of multiple, concurrent connection handling. This closes #932.
ffc4e86 is described below
commit ffc4e8649aac070a70688a99fc19dbb8f17e195e
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Dec 2 12:55:32 2020 -0500
DISPATCH-1857 - Handle asynchronous moving of a delivery to another link.
Patch contents from Gordon Sim.
This improves the stability of multiple, concurrent connection handling.
This closes #932.
---
include/qpid/dispatch/protocol_adaptor.h | 1 +
src/adaptors/tcp_adaptor.c | 110 ++++++++++++++++++-------------
src/router_core/router_core.c | 2 +
src/router_core/transfer.c | 11 +++-
4 files changed, 78 insertions(+), 46 deletions(-)
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index 93b23b4..ae7036f 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -203,6 +203,7 @@ typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
*/
typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit);
+extern const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK;
/**
* qdr_link_deliver_t callback
*
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 9c94c86..98df531 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -53,6 +53,7 @@ struct qdr_tcp_connection_t {
bool flow_enabled;
bool egress_dispatcher;
bool connector_closed;//only used if egress_dispatcher=true
+ qdr_delivery_t *initial_delivery;
qd_timer_t *activate_timer;
qd_bridge_config_t config;
qd_server_t *server;
@@ -93,6 +94,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo
static void handle_disconnected(qdr_tcp_connection_t* conn);
static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
+static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn)
{
@@ -494,12 +496,16 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
case PN_RAW_CONNECTION_CONNECTED: {
if (conn->ingress) {
qdr_tcp_connection_ingress_accept(conn);
- qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s", conn->conn_id, conn->remote_address);
+ qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s (global_id=%s)", conn->conn_id, conn->remote_address, conn->global_id);
break;
} else {
conn->remote_address = get_address_string(conn->socket);
conn->opened_time = tcp_adaptor->core->uptime_ticks;
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id);
+ if (!!conn->initial_delivery) {
+ qdr_tcp_open_server_side_connection(conn);
+ conn->initial_delivery = 0;
+ }
while (qdr_connection_process(conn->conn)) {}
handle_outgoing(conn);
break;
@@ -529,6 +535,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", conn->conn_id);
while (qdr_connection_process(conn->conn)) {}
+ handle_incoming(conn);
break;
}
case PN_RAW_CONNECTION_WAKE: {
@@ -563,6 +570,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
default:
+ qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Unexpected Event: %d", conn->conn_id, pn_event_type(e));
break;
}
}
@@ -586,35 +594,18 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
return tc;
}
-static void tcp_connector_establish(qdr_tcp_connection_t *conn)
-{
- qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", conn->conn_id, conn->config.host_port);
- conn->socket = pn_raw_connection();
- pn_raw_connection_set_context(conn->socket, conn);
- pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port);
-}
-static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
{
- qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
- ZERO(tc);
- if (initial_delivery) {
- tc->egress_dispatcher = false;
- } else {
- tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
- tc->egress_dispatcher = true;
- }
- tc->ingress = false;
- tc->context.context = tc;
- tc->context.handler = &handle_connection_event;
- tc->config = *config;
- tc->server = server;
+ const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port;
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host);
+
qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted,
false, //bool is_authenticated,
true, //bool opened,
"", //char *sasl_mechanisms,
QD_OUTGOING, //qd_direction_t dir,
- tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port, //const char *host,
+ host, //const char *host,
"", //const char *ssl_proto,
"", //const char *ssl_cipher,
"", //const char *user,
@@ -625,7 +616,6 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
"", // peer router version,
false); // streaming links
- tc->conn_id = qd_server_allocate_connection_id(tc->server);
qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
tcp_adaptor->adaptor,
false,
@@ -650,20 +640,48 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi
qdr_terminus_set_address(source, tc->config.address);
tc->outgoing = qdr_link_first_attach(conn,
- QD_OUTGOING,
- source, //qdr_terminus_t *source,
- qdr_terminus(0), //qdr_terminus_t *target,
- "tcp.egress.out", //const char *name,
- 0, //const char *terminus_addr,
- !(tc->egress_dispatcher),
- initial_delivery,
- &(tc->outgoing_id));
+ QD_OUTGOING,
+ source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "tcp.egress.out", //const char *name,
+ 0, //const char *terminus_addr,
+ !(tc->egress_dispatcher),
+ tc->initial_delivery,
+ &(tc->outgoing_id));
qdr_link_set_context(tc->outgoing, tc);
- //the incoming link for egress is created once we receive the
- //message which has the reply to address (and read buffers are
- //granted at that point)
- if (!tc->egress_dispatcher) {
- tcp_connector_establish(tc);
+}
+
+
+static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+{
+ qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t);
+ ZERO(tc);
+ if (initial_delivery) {
+ tc->egress_dispatcher = false;
+ tc->initial_delivery = initial_delivery;
+ } else {
+ tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc);
+ tc->egress_dispatcher = true;
+ }
+ tc->ingress = false;
+ tc->context.context = tc;
+ tc->context.handler = &handle_connection_event;
+ tc->config = *config;
+ tc->server = server;
+ tc->conn_id = qd_server_allocate_connection_id(tc->server);
+
+ //
+ // If this is the egress dispatcher, set up the core connection now. Otherwise, set up a physical
+ // raw connection and wait until we are running in that connection's context to set up the core
+ // connection.
+ //
+ if (tc->egress_dispatcher)
+ qdr_tcp_open_server_side_connection(tc);
+ else {
+ qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", tc->conn_id, tc->config.host_port);
+ tc->socket = pn_raw_connection();
+ pn_raw_connection_set_context(tc->socket, tc);
+ pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->socket, tc->config.host_port);
}
return tc;
@@ -1021,6 +1039,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, tc->outgoing_id);
if (tc->egress_dispatcher) {
qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+ return QD_DELIVERY_MOVED_TO_NEW_LINK;
} else if (!tc->outstream) {
tc->outstream = delivery;
qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
@@ -1038,14 +1057,15 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_set_address(target, tc->reply_to);
tc->incoming = qdr_link_first_attach(tc->conn,
- QD_INCOMING,
- qdr_terminus(0), //qdr_terminus_t *source,
- target, //qdr_terminus_t *target,
- "tcp.egress.in", //const char *name,
- 0, //const char *terminus_addr,
- false,
- NULL,
- &(tc->incoming_id));
+ QD_INCOMING,
+ qdr_terminus(0), //qdr_terminus_t *source,
+ target, //qdr_terminus_t *target,
+ "tcp.egress.in", //const char *name,
+ 0, //const char *terminus_addr,
+ false,
+ NULL,
+ &(tc->incoming_id));
+ qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Create Link to %s", tc->conn_id, tc->reply_to);
qdr_link_set_context(tc->incoming, tc);
//add this connection to those visible through management now that we have the global_id
qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection");
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d1f7273..779a1f0 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -39,6 +39,8 @@ ALLOC_DEFINE(qdr_connection_ref_t);
ALLOC_DEFINE(qdr_connection_info_t);
ALLOC_DEFINE(qdr_subscription_ref_t);
+const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK = 999999999;
+
static void qdr_general_handler(void *context);
qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 8c361a2..d481634 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -170,6 +170,9 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
to_new_link = true;
break;
}
+ if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) {
+ break;
+ }
} while (settled != dlv->settled && !to_new_link); // oops missed the settlement
send_complete = qdr_delivery_send_complete(dlv);
@@ -209,6 +212,12 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
}
}
}
+ else if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) {
+ DEQ_REMOVE_HEAD(link->undelivered);
+ dlv->link_work = 0;
+ dlv->where = QDR_DELIVERY_NOWHERE;
+ qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - moved from undelivered list to some other link");
+ }
else {
qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - not send_complete");
@@ -232,7 +241,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
}
sys_mutex_unlock(conn->work_lock);
- if (new_disp) {
+ if (new_disp && new_disp != QD_DELIVERY_MOVED_TO_NEW_LINK) {
// the remote sender-settle-mode forced us to pre-settle the
// message. The core needs to know this, so we "fake" receiving a
// settle+disposition update from the remote end of the link:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org