You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/09/24 14:01:20 UTC
[qpid-dispatch] branch main updated: DISPATCH-2219: fix
inter-router link priorities
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 8d5d3e4 DISPATCH-2219: fix inter-router link priorities
8d5d3e4 is described below
commit 8d5d3e41cda09cdc56fceab3bb7f52add87eac6a
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Fri Sep 10 13:18:07 2021 -0400
DISPATCH-2219: fix inter-router link priorities
This closes #1367
---
include/qpid/dispatch/amqp.h | 7 +++
src/message_private.h | 4 --
src/router_core/connections.c | 60 ++++++++++++++++--------
src/router_core/core_link_endpoint.c | 3 +-
src/router_core/forwarder.c | 3 +-
src/router_core/modules/edge_router/addr_proxy.c | 11 +++--
src/router_core/route_control.c | 2 +-
src/router_core/router_core_private.h | 6 ++-
8 files changed, 63 insertions(+), 33 deletions(-)
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 9b8665a..7c3ef08 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -218,4 +218,11 @@ extern const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED;
#define QD_AMQP_LINK_ROLE_RECEIVER true
/// @};
+/** @name AMQP Message priority. */
+/// @{
+#define QDR_N_PRIORITIES 10
+#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1)
+#define QDR_DEFAULT_PRIORITY 4
+/// @};
+
#endif
diff --git a/src/message_private.h b/src/message_private.h
index e7217e8..944612e 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -179,10 +179,6 @@ ALLOC_DECLARE(qd_message_content_t);
/** Initialize logging */
void qd_message_initialize();
-#define QDR_N_PRIORITIES 10
-#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1)
-#define QDR_DEFAULT_PRIORITY 4
-
// These expect content->lock to be locked.
bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content);
bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4f28d48..12812ad 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -618,6 +618,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->zero_credit_time = conn->core->uptime_ticks;
link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus);
link->no_route = no_route;
+ link->priority = QDR_DEFAULT_PRIORITY;
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
@@ -630,9 +631,10 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
tsan_reset_delivery_ids(initial_delivery, link->conn->identity, link->identity);
}
- if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL))
+ if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL)) {
link->link_type = QD_LINK_CONTROL;
- else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
+ link->priority = QDR_MAX_PRIORITY;
+ } else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA))
link->link_type = QD_LINK_ROUTER;
else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_EDGE_DOWNLINK)) {
if (conn->core->router_mode == QD_ROUTER_MODE_INTERIOR &&
@@ -1120,7 +1122,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target,
- qd_session_class_t ssn_class)
+ qd_session_class_t ssn_class,
+ uint8_t priority)
{
//
// Create a new link, initiated by the router core. This will involve issuing a first-attach outbound.
@@ -1148,6 +1151,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->attach_count = 1;
link->core_ticks = core->uptime_ticks;
link->zero_credit_time = core->uptime_ticks;
+ link->priority = priority;
link->strip_annotations_in = conn->strip_annotations_in;
link->strip_annotations_out = conn->strip_annotations_out;
@@ -1407,15 +1411,23 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
// inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for
// routed-message transfer.
//
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING,
+ qdr_terminus_router_control(), qdr_terminus_router_control(),
+ QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING,
+ qdr_terminus_router_control(), qdr_terminus_router_control(),
+ QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY);
STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME);
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
// a session is reserved for each priority link
qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority);
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING,
+ qdr_terminus_router_data(), qdr_terminus_router_data(),
+ sc, priority);
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING,
+ qdr_terminus_router_data(), qdr_terminus_router_data(),
+ sc, priority);
}
}
}
@@ -1462,12 +1474,12 @@ qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connectio
case QDR_ROLE_INTER_ROUTER:
out_link = qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING,
qdr_terminus_router_data(), qdr_terminus_router_data(),
- QD_SSN_LINK_STREAMING);
+ QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY);
break;
case QDR_ROLE_EDGE_CONNECTION:
out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING,
qdr_terminus(0), qdr_terminus(0),
- QD_SSN_LINK_STREAMING);
+ QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY);
break;
default:
assert(false);
@@ -1599,14 +1611,20 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
assert(link->link_type == QD_LINK_ROUTER);
- // The first QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over the
- // connection are the shared priority links. These links are attached in
- // priority order starting at zero.
- int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count;
- if (next_pri < QDR_N_PRIORITIES) {
- link->priority = next_pri;
- core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link;
- core->data_links_by_mask_bit[conn->mask_bit].count += 1;
+ // The first 2 x QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over
+ // the inter-router connection are the shared priority links. These links
+ // are attached in priority order starting at zero.
+ if (link->link_direction == QD_OUTGOING) {
+ int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count;
+ if (next_pri < QDR_N_PRIORITIES) {
+ link->priority = next_pri;
+ core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link;
+ core->data_links_by_mask_bit[conn->mask_bit].count += 1;
+ }
+ } else {
+ if (conn->next_pri < QDR_N_PRIORITIES) {
+ link->priority = conn->next_pri++;
+ }
}
}
@@ -1792,8 +1810,10 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
break;
}
- case QD_LINK_CONTROL:
case QD_LINK_ROUTER:
+ qdr_attach_link_data_CT(core, conn, link);
+ // fall-through:
+ case QD_LINK_CONTROL:
qdr_link_outbound_second_attach_CT(core, link, source, target);
qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
@@ -1918,8 +1938,10 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac
qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
- case QD_LINK_CONTROL:
case QD_LINK_ROUTER:
+ qdr_attach_link_data_CT(core, conn, link);
+ // fall-through
+ case QD_LINK_CONTROL:
qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c
index fdecf92..b65ac6a 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -76,8 +76,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core,
ep->desc = desc;
ep->link_context = link_context;
ep->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target,
- QD_SSN_CORE_ENDPOINT);
-
+ QD_SSN_CORE_ENDPOINT, QDR_DEFAULT_PRIORITY);
ep->link->core_endpoint = ep;
return ep;
}
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 642b4cd..3bc3a8f 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -1037,7 +1037,8 @@ void qdr_forward_link_direct_CT(qdr_core_t *core,
out_link->zero_credit_time = core->uptime_ticks;
out_link->strip_annotations_in = conn->strip_annotations_in;
out_link->strip_annotations_out = conn->strip_annotations_out;
-
+ out_link->priority = in_link->priority;
+
if (strip) {
out_link->strip_prefix = strip;
}
diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c
index 0e76626..53ead53 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -159,7 +159,8 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING,
- term, qdr_terminus_normal(0), QD_SSN_ENDPOINT);
+ term, qdr_terminus_normal(0), QD_SSN_ENDPOINT,
+ QDR_DEFAULT_PRIORITY);
qdr_core_bind_address_link_CT(ap->core, addr, link);
addr->edge_inlink = link;
}
@@ -202,7 +203,8 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_
}
qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING,
- qdr_terminus_normal(0), term, QD_SSN_ENDPOINT);
+ qdr_terminus_normal(0), term, QD_SSN_ENDPOINT,
+ QDR_DEFAULT_PRIORITY);
addr->edge_outlink = link;
}
}
@@ -281,7 +283,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn,
QD_LINK_ENDPOINT, QD_OUTGOING,
qdr_terminus(0), qdr_terminus(0),
- QD_SSN_ENDPOINT);
+ QD_SSN_ENDPOINT,
+ QDR_DEFAULT_PRIORITY);
//
// Associate the anonymous sender with the edge connection address. This will cause
@@ -297,7 +300,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c
QD_LINK_ENDPOINT, QD_INCOMING,
qdr_terminus_edge_downlink(ap->core->router_id),
qdr_terminus_edge_downlink(0),
- QD_SSN_ENDPOINT);
+ QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY);
//
// Attach a receiving link for edge address tracking updates.
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 7adfdc3..3f898b7 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -257,7 +257,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr
} else
qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase)
al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target,
- QD_SSN_ENDPOINT);
+ QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY);
al->link->auto_link = al;
al->link->phase = al->phase;
al->link->fallback = al->fallback;
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 9754465..219290c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -673,6 +673,8 @@ struct qdr_connection_t {
qdr_core_t *core;
bool incoming;
bool in_activate_list;
+ bool closed; // This bit is used in the case where a client is trying to force close this connection.
+ uint8_t next_pri; // for incoming inter-router data links
qdr_connection_role_t role;
int inter_router_cost;
qdr_conn_identifier_t *conn_id;
@@ -693,7 +695,6 @@ struct qdr_connection_t {
qdr_conn_oper_status_t oper_status;
qdr_conn_admin_status_t admin_status;
qdr_error_t *error;
- bool closed; // This bit is used in the case where a client is trying to force close this connection.
uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running.
uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection.
bool enable_protocol_trace; // Has trace level logging been turned on for this connection.
@@ -1010,7 +1011,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target,
- qd_session_class_t ssn_class);
+ qd_session_class_t ssn_class,
+ uint8_t priority);
void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close);
void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org