You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/04 14:51:51 UTC
[2/2] qpid-dispatch git commit: DISPATCH-1194 - Refactored the
address lookup for attach into a core module with asynchronous capability.
DISPATCH-1194 - Refactored the address lookup for attach into a core module with asynchronous capability.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/564c5907
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/564c5907
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/564c5907
Branch: refs/heads/master
Commit: 564c5907cebb12653181f2e266dce8b581dc7c32
Parents: d201dea
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 4 09:38:09 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 4 09:38:09 2018 -0500
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/router_core/connections.c | 345 +----------------
src/router_core/core_attach_address_lookup.h | 54 +++
.../address_lookup_client/lookup_client.c | 367 +++++++++++++++++++
src/router_core/router_core_private.h | 3 +
5 files changed, 433 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 352aec7..8fc6485 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -110,6 +110,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/test_hooks/core_test_hooks.c
router_core/modules/edge_addr_tracking/edge_addr_tracking.c
router_core/modules/address_lookup/address_lookup.c
+ router_core/modules/address_lookup_client/lookup_client.c
router_node.c
router_pynode.c
schema_enum.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index db271f4..d06bec9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -607,33 +607,6 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core,
/**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- */
-static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
-{
- char discriminator[QD_DISCRIMINATOR_SIZE];
- qd_generate_discriminator(discriminator);
- if (core->router_mode == QD_ROUTER_MODE_EDGE)
- snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
- else
- snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
-}
-
-
-/**
- * Generate a temporary mobile address for a producer connected to this
- * router node.
- */
-static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
-{
- char discriminator[QD_DISCRIMINATOR_SIZE];
- qd_generate_discriminator(discriminator);
- snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
-}
-
-
-/**
* Generate a link name
*/
static void qdr_generate_link_name(const char *label, char *buffer, size_t length)
@@ -1087,173 +1060,6 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr)
}
-/**
- * qdr_lookup_terminus_address_CT
- *
- * Lookup a terminus address in the route table and possibly create a new address
- * if no match is found.
- *
- * @param core Pointer to the core object
- * @param dir Direction of the link for the terminus
- * @param conn The connection over which the terminus was attached
- * @param terminus The terminus containing the addressing information to be looked up
- * @param create_if_not_found Iff true, return a pointer to a newly created address record
- * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
- * @param [out] link_route True iff the lookup indicates that an attach should be routed
- * @param [out] unavailable True iff this address is blocked as unavailable
- * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
- * @return Pointer to an address record or 0 if none is found
- */
-static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
- qd_direction_t dir,
- qdr_connection_t *conn,
- qdr_terminus_t *terminus,
- bool create_if_not_found,
- bool accept_dynamic,
- bool *link_route,
- bool *unavailable,
- bool *core_endpoint)
-{
- qdr_address_t *addr = 0;
-
- //
- // Unless expressly stated, link routing is not indicated for this terminus.
- //
- *link_route = false;
- *unavailable = false;
- *core_endpoint = false;
-
- if (qdr_terminus_is_dynamic(terminus)) {
- //
- // The terminus is dynamic. Check to see if there is an address provided
- // in the dynamic node properties. If so, look that address up as a link-routed
- // destination.
- //
- qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
- if (dnp_address) {
- qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
- if (conn->tenant_space)
- qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
- qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
-
- if (addr && conn->tenant_space) {
- //
- // If this link is in a tenant space, translate the dnp address to
- // the fully-qualified view
- //
- qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
- }
-
- qd_iterator_free(dnp_address);
- *link_route = true;
- return addr;
- }
-
- //
- // The dynamic terminus has no address in the dynamic-node-propteries. If we are
- // permitted to generate dynamic addresses, create a new address that is local to
- // this router and insert it into the address table with a hash index.
- //
- if (!accept_dynamic)
- return 0;
-
- char temp_addr[200];
- bool generating = true;
- while (generating) {
- //
- // The address-generation process is performed in a loop in case the generated
- // address collides with a previously generated address (this should be _highly_
- // unlikely).
- //
- if (dir == QD_OUTGOING)
- qdr_generate_temp_addr(core, temp_addr, 200);
- else
- qdr_generate_mobile_addr(core, temp_addr, 200);
-
- qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
- qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
- if (!addr) {
- addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
- qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
- qdr_terminus_set_address(terminus, temp_addr);
- generating = false;
- }
- qd_iterator_free(temp_iter);
- }
- return addr;
- }
-
- //
- // If the terminus is anonymous, there is no address to look up.
- //
- if (qdr_terminus_is_anonymous(terminus))
- return 0;
-
- //
- // The terminus has a non-dynamic address that we need to look up. First, look for
- // a link-route destination for the address.
- //
- qd_iterator_t *iter = qdr_terminus_get_address(terminus);
- qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
- if (conn->tenant_space)
- qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
- qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
- if (addr) {
- *link_route = true;
-
- //
- // If this link is in a tenant space, translate the terminus address to
- // the fully-qualified view
- //
- if (conn->tenant_space) {
- qdr_terminus_set_address_iterator(terminus, iter);
- }
- return addr;
- }
-
- //
- // There was no match for a link-route destination, look for a message-route address.
- //
- int in_phase;
- int out_phase;
- int addr_phase;
- int priority;
- qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
-
- qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
- addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
- qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
-
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
-
- if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
- *unavailable = true;
-
- if (!addr && create_if_not_found) {
- addr = qdr_address_CT(core, treat);
- if (addr) {
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
- }
-
- if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
- *unavailable = true;
- }
-
- if (qdr_terminus_is_coordinator(terminus))
- *unavailable = false;
-
- if (!!addr && addr->core_endpoint != 0)
- *core_endpoint = true;
-
- if (addr)
- addr->priority = priority;
- return addr;
-}
-
-
static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
@@ -1412,19 +1218,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
}
-static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
-{
- size_t olen = strlen(original);
- size_t clen = strlen(conn->container);
- char *name = (char*) malloc(olen + clen + 2);
- memset(name, 0, olen + clen + 2);
- strcat(name, original);
- name[olen] = '@';
- strcat(name + olen + 1, conn->container);
- return name;
-}
-
-
//
// Handle the attachment and detachment of an inter-router control link
//
@@ -1508,7 +1301,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qd_direction_t dir = action->args.connection.dir;
qdr_terminus_t *source = action->args.connection.source;
qdr_terminus_t *target = action->args.connection.target;
- bool success;
//
// Put the link into the proper lists for tracking.
@@ -1561,93 +1353,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// This link has a target address
//
- bool link_route;
- bool unavailable;
- bool core_endpoint;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint);
-
- if (core_endpoint) {
- qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
- }
-
- else if (unavailable) {
- qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
-
- else if (!addr) {
- //
- // No route to this destination, reject the link
- //
+ if (core->addr_lookup_handler)
+ core->addr_lookup_handler(core, conn, link, dir, source, target);
+ else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
- }
-
- else if (link_route) {
- //
- // This is a link-routed destination, forward the attach to the next hop
- //
- if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- } else {
- if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
- link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
- }
- success = qdr_forward_attach_CT(core, addr, link, source, target);
-
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
- }
-
- }
-
- else if (qdr_terminus_is_coordinator(target)) {
- //
- // This target terminus is a coordinator.
- // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
- // The router should reject this link because the router cannot coordinate transactions itself.
- //
- // The attach response should have a null target to indicate refusal and the immediately coming detach.
- qdr_link_outbound_second_attach_CT(core, link, source, 0);
- // Now, send back a detach with the error amqp:precondition-failed
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
- }
- else {
- //
- // Associate the link with the address. With this association, it will be unnecessary
- // to do an address lookup for deliveries that arrive on this link.
- //
- qdr_core_bind_address_link_CT(core, addr, link);
- qdr_link_outbound_second_attach_CT(core, link, source, target);
-
- //
- // Issue the initial credit only if one of the following
- // holds:
- // - there are destinations for the address
- // - if the address treatment is multicast
- // - the address is that of an exchange (no subscribers allowed)
- //
- if (DEQ_SIZE(addr->subscriptions)
- || DEQ_SIZE(addr->rlinks)
- || qd_bitmask_cardinality(addr->rnodes)
- || qdr_is_addr_treatment_multicast(addr)
- || !!addr->exchange) {
- qdr_link_issue_credit_CT(core, link, link->capacity, false);
- }
-
- //
- // If this link came through an edge connection, raise a link event to
- // herald that fact.
- //
- if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
- qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+ return;
}
}
break;
@@ -1668,54 +1380,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
- bool link_route;
- bool unavailable;
- bool core_endpoint;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint);
-
- if (core_endpoint) {
- qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
- }
-
- else if (unavailable) {
- qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
-
- else if (!addr) {
- //
- // No route to this destination, reject the link
- //
+ if (core->addr_lookup_handler)
+ core->addr_lookup_handler(core, conn, link, dir, source, target);
+ else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
- }
-
- else if (link_route) {
- //
- // This is a link-routed destination, forward the attach to the next hop
- //
- if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- } else {
- if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
- link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
- }
- bool success = qdr_forward_attach_CT(core, addr, link, source, target);
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
- }
- }
-
- else {
- qdr_core_bind_address_link_CT(core, addr, link);
- qdr_link_outbound_second_attach_CT(core, link, source, target);
+ return;
}
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/core_attach_address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h
new file mode 100644
index 0000000..d05b9cb
--- /dev/null
+++ b/src/router_core/core_attach_address_lookup.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_attach_addr_lookup_types
+#define qd_router_core_attach_addr_lookup_types 1
+
+#include "router_core_private.h"
+#endif
+
+#ifndef qd_router_core_attach_addr_lookup
+#define qd_router_core_attach_addr_lookup 1
+
+
+/**
+ * Handler - Look up the address on a received first-attach
+ *
+ * This handler is invoked upon receipt of a first-attach on a normal endpoint link.
+ * The appropriate address, from source or target will be resolved to and address for
+ * message or link routing. This operation may be synchronoue (completed before it
+ * returns) or asynchronous (completed later).
+ *
+ * @param core Pointer to the core state.
+ * @param conn Pointer to the connection over which the attach arrived.
+ * @param link Pointer to the attaching link.
+ * @param dir The direction of message flow for the link.
+ * @param source The source terminus for the attach.
+ * @param target The target terminus for the attach.
+ */
+typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target);
+
+
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
new file mode 100644
index 0000000..924565e
--- /dev/null
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "module.h"
+#include "core_attach_address_lookup.h"
+#include "router_core_private.h"
+#include <qpid/dispatch/discriminator.h>
+#include <stdio.h>
+
+static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
+{
+ size_t olen = strlen(original);
+ size_t clen = strlen(conn->container);
+ char *name = (char*) malloc(olen + clen + 2);
+ memset(name, 0, olen + clen + 2);
+ strcat(name, original);
+ name[olen] = '@';
+ strcat(name + olen + 1, conn->container);
+ return name;
+}
+
+
+/**
+ * Generate a temporary routable address for a destination connected to this
+ * router node.
+ */
+static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+ char discriminator[QD_DISCRIMINATOR_SIZE];
+ qd_generate_discriminator(discriminator);
+ if (core->router_mode == QD_ROUTER_MODE_EDGE)
+ snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
+ else
+ snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
+}
+
+
+/**
+ * Generate a temporary mobile address for a producer connected to this
+ * router node.
+ */
+static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+ char discriminator[QD_DISCRIMINATOR_SIZE];
+ qd_generate_discriminator(discriminator);
+ snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
+}
+
+
+/**
+ * qdr_lookup_terminus_address_CT
+ *
+ * Lookup a terminus address in the route table and possibly create a new address
+ * if no match is found.
+ *
+ * @param core Pointer to the core object
+ * @param dir Direction of the link for the terminus
+ * @param conn The connection over which the terminus was attached
+ * @param terminus The terminus containing the addressing information to be looked up
+ * @param create_if_not_found Iff true, return a pointer to a newly created address record
+ * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
+ * @param [out] link_route True iff the lookup indicates that an attach should be routed
+ * @param [out] unavailable True iff this address is blocked as unavailable
+ * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
+ * @return Pointer to an address record or 0 if none is found
+ */
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
+ qd_direction_t dir,
+ qdr_connection_t *conn,
+ qdr_terminus_t *terminus,
+ bool create_if_not_found,
+ bool accept_dynamic,
+ bool *link_route,
+ bool *unavailable,
+ bool *core_endpoint)
+{
+ qdr_address_t *addr = 0;
+
+ //
+ // Unless expressly stated, link routing is not indicated for this terminus.
+ //
+ *link_route = false;
+ *unavailable = false;
+ *core_endpoint = false;
+
+ if (qdr_terminus_is_dynamic(terminus)) {
+ //
+ // The terminus is dynamic. Check to see if there is an address provided
+ // in the dynamic node properties. If so, look that address up as a link-routed
+ // destination.
+ //
+ qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
+ if (dnp_address) {
+ qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
+ qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
+
+ if (addr && conn->tenant_space) {
+ //
+ // If this link is in a tenant space, translate the dnp address to
+ // the fully-qualified view
+ //
+ qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
+ }
+
+ qd_iterator_free(dnp_address);
+ *link_route = true;
+ return addr;
+ }
+
+ //
+ // The dynamic terminus has no address in the dynamic-node-propteries. If we are
+ // permitted to generate dynamic addresses, create a new address that is local to
+ // this router and insert it into the address table with a hash index.
+ //
+ if (!accept_dynamic)
+ return 0;
+
+ char temp_addr[200];
+ bool generating = true;
+ while (generating) {
+ //
+ // The address-generation process is performed in a loop in case the generated
+ // address collides with a previously generated address (this should be _highly_
+ // unlikely).
+ //
+ if (dir == QD_OUTGOING)
+ qdr_generate_temp_addr(core, temp_addr, 200);
+ else
+ qdr_generate_mobile_addr(core, temp_addr, 200);
+
+ qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
+ if (!addr) {
+ addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
+ qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ qdr_terminus_set_address(terminus, temp_addr);
+ generating = false;
+ }
+ qd_iterator_free(temp_iter);
+ }
+ return addr;
+ }
+
+ //
+ // If the terminus is anonymous, there is no address to look up.
+ //
+ if (qdr_terminus_is_anonymous(terminus))
+ return 0;
+
+ //
+ // The terminus has a non-dynamic address that we need to look up. First, look for
+ // a link-route destination for the address.
+ //
+ qd_iterator_t *iter = qdr_terminus_get_address(terminus);
+ qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
+ qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
+ if (addr) {
+ *link_route = true;
+
+ //
+ // If this link is in a tenant space, translate the terminus address to
+ // the fully-qualified view
+ //
+ if (conn->tenant_space) {
+ qdr_terminus_set_address_iterator(terminus, iter);
+ }
+ return addr;
+ }
+
+ //
+ // There was no match for a link-route destination, look for a message-route address.
+ //
+ int in_phase;
+ int out_phase;
+ int addr_phase;
+ int priority;
+ qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
+
+ qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
+ addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
+ qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
+
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+ if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
+ *unavailable = true;
+
+ if (!addr && create_if_not_found) {
+ addr = qdr_address_CT(core, treat);
+ if (addr) {
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ }
+
+ if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
+ *unavailable = true;
+ }
+
+ if (qdr_terminus_is_coordinator(terminus))
+ *unavailable = false;
+
+ if (!!addr && addr->core_endpoint != 0)
+ *core_endpoint = true;
+
+ if (addr)
+ addr->priority = priority;
+ return addr;
+}
+
+
+static void qdr_link_react_to_first_attach_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_address_t *addr,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ bool link_route,
+ bool unavailable,
+ bool core_endpoint)
+{
+ if (core_endpoint) {
+ qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
+ }
+
+ else if (unavailable) {
+ qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+
+ else if (!addr) {
+ //
+ // No route to this destination, reject the link
+ //
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+
+ else if (link_route) {
+ //
+ // This is a link-routed destination, forward the attach to the next hop
+ //
+ qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+ if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ } else {
+ if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+ link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+ }
+ bool success = qdr_forward_attach_CT(core, addr, link, source, target);
+ if (!success) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+ }
+ }
+
+ else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) {
+ //
+ // This target terminus is a coordinator.
+ // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
+ // The router should reject this link because the router cannot coordinate transactions itself.
+ //
+ // The attach response should have a null target to indicate refusal and the immediately coming detach.
+ //
+ qdr_link_outbound_second_attach_CT(core, link, source, 0);
+
+ //
+ // Now, send back a detach with the error amqp:precondition-failed
+ //
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
+ } else {
+ //
+ // Associate the link with the address. With this association, it will be unnecessary
+ // to do an address lookup for deliveries that arrive on this link.
+ //
+ qdr_core_bind_address_link_CT(core, addr, link);
+ qdr_link_outbound_second_attach_CT(core, link, source, target);
+
+ //
+ // Issue the initial credit only if one of the following
+ // holds:
+ // - there are destinations for the address
+ // - if the address treatment is multicast
+ // - the address is that of an exchange (no subscribers allowed)
+ //
+ if (dir == QD_INCOMING
+ && (DEQ_SIZE(addr->subscriptions)
+ || DEQ_SIZE(addr->rlinks)
+ || qd_bitmask_cardinality(addr->rnodes)
+ || qdr_is_addr_treatment_multicast(addr)
+ || !!addr->exchange)) {
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
+ }
+
+ //
+ // If this link came through an edge connection, raise a link event to
+ // herald that fact.
+ //
+ if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
+ qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+ }
+}
+
+
+static void qcm_addr_lookup_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
+
+{
+ bool link_route;
+ bool unavailable;
+ bool core_endpoint;
+ qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
+ qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+}
+
+
+static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context)
+{
+ assert(core->addr_lookup_handler == 0);
+
+ core->addr_lookup_handler = qcm_addr_lookup_CT;
+ *module_context = core;
+}
+
+
+static void qcm_addr_lookup_client_final_CT(void *module_context)
+{
+ qdr_core_t *core = (qdr_core_t*) module_context;
+ core->addr_lookup_handler = 0;
+}
+
+
+QDR_CORE_MODULE_DECLARE("address_lookup_client", qcm_addr_lookup_client_init_CT, qcm_addr_lookup_client_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0b70c97..bb4af57 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -43,6 +43,7 @@ typedef struct qdr_edge_t qdr_edge_t;
#include "core_link_endpoint.h"
#include "core_events.h"
+#include "core_attach_address_lookup.h"
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -727,6 +728,8 @@ struct qdr_core_t {
qdr_connection_list_t connections_to_activate;
qdr_link_list_t open_links;
+ qdrc_attach_addr_lookup_t addr_lookup_handler;
+
//
// Agent section
//
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org