You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/04 14:51:50 UTC
[1/2] qpid-dispatch git commit: DISPATCH-1194: Link Route address
lookup for edge router This closes #423
Repository: qpid-dispatch
Updated Branches:
refs/heads/master db87d5d3e -> 564c5907c
DISPATCH-1194: Link Route address lookup for edge router
This closes #423
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d201deac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d201deac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d201deac
Branch: refs/heads/master
Commit: d201deac4ec8854571dcd3acd1aa5aa39d16de5b
Parents: db87d5d
Author: Kenneth Giusti <kg...@apache.org>
Authored: Wed Nov 28 09:30:09 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 4 09:34:47 2018 -0500
----------------------------------------------------------------------
include/qpid/dispatch/amqp.h | 5 +
src/CMakeLists.txt | 1 +
.../modules/address_lookup/address_lookup.c | 454 +++++++++++++++++++
.../modules/address_lookup/address_lookup.h | 75 +++
src/router_core/router_core_private.h | 2 +-
src/router_core/transfer.c | 45 +-
tests/CMakeLists.txt | 1 +
tests/system_test.py | 9 +-
tests/system_tests_address_lookup.py | 287 ++++++++++++
9 files changed, 857 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index e62b7f7..99315c7 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -184,5 +184,10 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
/// @};
+/** @name AMQP link endpoint role. */
+/// @{
+#define QD_AMQP_LINK_ROLE_SENDER false
+#define QD_AMQP_LINK_ROLE_RECEIVER true
+/// @};
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ebc0e09..352aec7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -109,6 +109,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/edge_router/edge_mgmt.c
router_core/modules/test_hooks/core_test_hooks.c
router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+ router_core/modules/address_lookup/address_lookup.c
router_node.c
router_pynode.c
schema_enum.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.c b/src/router_core/modules/address_lookup/address_lookup.c
new file mode 100644
index 0000000..93297ac
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.c
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "address_lookup.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+
+#include "inttypes.h"
+
+#define PROTOCOL_VERSION 1
+
+typedef enum {
+ // note: keep unit test in sync
+ OPCODE_INVALID,
+ OPCODE_LINK_ROUTE_LOOKUP,
+} address_lookup_opcode_t;
+
+
+/* create the message application properties and body for the link route lookup
+ * request message
+ */
+int qcm_link_route_lookup_msg(qd_iterator_t *address,
+ qd_direction_t dir,
+ qd_composed_field_t **properties,
+ qd_composed_field_t **body)
+{
+ *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+ if (!*properties)
+ return -1;
+ qd_compose_start_map(*properties);
+ qd_compose_insert_string(*properties, "version");
+ qd_compose_insert_uint(*properties, PROTOCOL_VERSION);
+ qd_compose_insert_string(*properties, "opcode");
+ qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP);
+ qd_compose_end_map(*properties);
+
+ *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+ if (!*body) {
+ qd_compose_free(*properties);
+ *properties = 0;
+ return -1;
+ }
+ qd_compose_start_list(*body);
+ qd_compose_insert_string_iterator(*body, address);
+ qd_compose_insert_bool(*body, (dir == QD_INCOMING
+ ? QD_AMQP_LINK_ROLE_RECEIVER
+ : QD_AMQP_LINK_ROLE_SENDER));
+ qd_compose_end_list(*body);
+ return 0;
+}
+
+
+/* parse a reply to the link route lookup request
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+ qd_iterator_t *body,
+ bool *is_link_route,
+ bool *has_destinations)
+{
+ qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK;
+ *is_link_route = false;
+ *has_destinations = false;
+
+ qd_parsed_field_t *props = qd_parse(properties);
+ if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props))
+ return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+
+ qd_parsed_field_t *bod = qd_parse(body);
+ if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) {
+ qd_parse_free(props);
+ return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ }
+
+ qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status");
+ if (!tmp || !qd_parse_is_scalar(tmp)) {
+ rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ goto exit;
+ } else {
+ int32_t status = qd_parse_as_int(tmp);
+ if (status != QCM_ADDR_LOOKUP_OK) {
+ rc = (qcm_address_lookup_status_t) status;
+ goto exit;
+ }
+ }
+
+ // bod[0] == is_link_route (bool)
+ // bod[1] == has_destinations (bool)
+
+ if (qd_parse_sub_count(bod) < 2) {
+ rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ goto exit;
+ }
+
+ *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0));
+ *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1));
+
+exit:
+ qd_parse_free(props);
+ qd_parse_free(bod);
+ return rc;
+}
+
+
+typedef struct _endpoint_ref {
+ DEQ_LINKS(struct _endpoint_ref);
+ qdrc_endpoint_t *endpoint;
+ const char *container_id;
+} _endpoint_ref_t;
+DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t);
+ALLOC_DEFINE(_endpoint_ref_t);
+
+
+static struct {
+ qdr_core_t *core;
+ _endpoint_ref_list_t endpoints;
+} _server_state;
+
+
+/* parse out the opcode from the request
+ */
+static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties)
+{
+ if (!properties)
+ return OPCODE_INVALID;
+ qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode");
+ if (!oc)
+ return OPCODE_INVALID;
+ uint32_t opcode = qd_parse_as_uint(oc);
+ if (!qd_parse_ok(oc))
+ return OPCODE_INVALID;
+ return (address_lookup_opcode_t)opcode;
+}
+
+
+/* send a reply to a lookup request
+ */
+static uint64_t _send_reply(_endpoint_ref_t *epr,
+ address_lookup_opcode_t opcode,
+ qcm_address_lookup_status_t status,
+ qd_iterator_t *correlation_id,
+ qd_iterator_t *reply_to,
+ qd_composed_field_t *body)
+{
+ if (!correlation_id || !reply_to) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Link route address reply failed - invalid request message properties"
+ " (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ return PN_REJECTED;
+ }
+
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
+ qd_compose_start_list(fld);
+ qd_compose_insert_null(fld); // message-id
+ qd_compose_insert_null(fld); // user-id
+ qd_compose_insert_typed_iterator(fld, reply_to); // to
+ qd_compose_insert_null(fld); // subject
+ qd_compose_insert_null(fld); // reply-to
+ qd_compose_insert_typed_iterator(fld, correlation_id);
+ qd_compose_end_list(fld);
+
+ fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld);
+ qd_compose_start_map(fld);
+ qd_compose_insert_string(fld, "version");
+ qd_compose_insert_uint(fld, PROTOCOL_VERSION);
+ qd_compose_insert_string(fld, "opcode");
+ qd_compose_insert_uint(fld, opcode);
+ qd_compose_insert_string(fld, "status");
+ qd_compose_insert_uint(fld, status);
+ qd_compose_end_map(fld);
+
+ qd_message_t *msg = qd_message();
+
+ qd_message_compose_3(msg, fld, body);
+ qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
+ qd_message_free(msg);
+ qd_compose_free(fld);
+
+ return PN_ACCEPTED;
+}
+
+
+/* perform a link route lookup
+ */
+static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr,
+ qd_parsed_field_t *body,
+ qd_iterator_t *reply_to,
+ qd_iterator_t *cid)
+{
+ if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Link route address lookup failed - invalid request body"
+ " (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ return PN_REJECTED;
+ }
+
+ //
+ // body[0] == fully qualified address (string)
+ // body[1] == direction (bool, true == receiver)
+ //
+
+ qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0));
+ qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1))
+ ? QD_INCOMING : QD_OUTGOING);
+
+ bool is_link_route = false;
+ bool has_destinations = false;
+ qdr_address_t *addr = 0;
+ qd_iterator_reset_view(addr_i, ITER_VIEW_ALL);
+ qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr);
+ if (addr) {
+ is_link_route = true;
+ has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes));
+ }
+
+ // out_body[0] == is_link_route (bool)
+ // out_body[1] == has_destinations (bool)
+
+ qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+ qd_compose_start_list(out_body);
+ qd_compose_insert_bool(out_body, is_link_route);
+ qd_compose_insert_bool(out_body, has_destinations);
+ qd_compose_end_list(out_body);
+
+ uint64_t rc = _send_reply(epr,
+ OPCODE_LINK_ROUTE_LOOKUP,
+ addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND,
+ cid,
+ reply_to,
+ out_body);
+ qd_compose_free(out_body);
+
+ if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
+ char *as = (char *)qd_iterator_copy(addr_i);
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s"
+ " (container=%s, endpoint=%p)",
+ as,
+ (addr) ? "" : "not ",
+ is_link_route ? "yes" : "no",
+ has_destinations ? "yes" : "no",
+ epr->container_id,
+ (void *)epr->endpoint);
+ free(as);
+ }
+ return rc;
+}
+
+
+/* handle lookup request from client
+ */
+void _on_transfer(void *link_context,
+ qdr_delivery_t *delivery,
+ qd_message_t *message)
+{
+ if (!qd_message_receive_complete(message))
+ return;
+
+ _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Address lookup request received (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+
+ uint64_t disposition = PN_ACCEPTED;
+ qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
+ qd_parsed_field_t *props = qd_parse(p_iter);
+ if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - no properties (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ qd_parsed_field_t *v = qd_parse_value_by_key(props, "version");
+ if (!v) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - no version (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ uint32_t version = qd_parse_as_uint(v);
+ if (!qd_parse_ok(v)) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - invalid version (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ if (version != PROTOCOL_VERSION) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - unknown version"
+ " (container=%s, endpoint=%p, version=%"PRIu32")",
+ epr->container_id, (void *)epr->endpoint, version);
+ disposition = PN_REJECTED;
+ goto exit;
+ // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION
+ }
+
+ address_lookup_opcode_t opcode = _decode_opcode(props);
+ switch (opcode) {
+ case OPCODE_LINK_ROUTE_LOOKUP: {
+ qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY);
+ qd_parsed_field_t *body = qd_parse(b_iter);
+ qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO);
+ qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID);
+ disposition = _do_link_route_lookup(epr, body, reply_to, cid);
+ qd_iterator_free(cid);
+ qd_iterator_free(reply_to);
+ qd_parse_free(body);
+ qd_iterator_free(b_iter);
+ break;
+ }
+ case OPCODE_INVALID:
+ default:
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - invalid opcode"
+ " (container=%s, endpoint=%p, opcode=%d)",
+ epr->container_id, (void *)epr->endpoint, opcode);
+ disposition = PN_REJECTED;
+ }
+
+exit:
+ qd_parse_free(props);
+ qd_iterator_free(p_iter);
+ qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition);
+ qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
+ return;
+}
+
+
+/* handle incoming attach to address lookup service
+ */
+static void _on_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+ //
+ // Only accept incoming links initiated by the edge router. Detach all
+ // other links
+ //
+ qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint);
+ if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING ||
+ conn->role != QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0);
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Attempt to attach to address lookup server rejected (container=%s)",
+ (conn->connection_info) ? conn->connection_info->container : "<unknown>");
+ return;
+ }
+
+ _endpoint_ref_t *epr = new__endpoint_ref_t();
+ ZERO(epr);
+ epr->endpoint = endpoint;
+ epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>";
+ DEQ_INSERT_TAIL(_server_state.endpoints, epr);
+ *link_context = epr;
+ qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target);
+ qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false);
+
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Client attached to address lookup server (container=%s, endpoint=%p)",
+ epr->container_id, (void *)endpoint);
+}
+
+
+/* handle incoming detach from client
+ */
+static void _on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+ qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0);
+ DEQ_REMOVE(_server_state.endpoints, epr);
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Client detached from address lookup server (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ free__endpoint_ref_t(epr);
+}
+
+
+static qdrc_endpoint_desc_t _endpoint_handlers =
+{
+ .label = "address lookup",
+ .on_first_attach = _on_first_attach,
+ .on_transfer = _on_transfer,
+ .on_first_detach = _on_first_detach,
+};
+
+
+static void _address_lookup_init_CT(qdr_core_t *core, void **module_context)
+{
+ //
+ // Address resolution service is provided by interior routers only
+ //
+ if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+ return;
+
+ _server_state.core = core;
+
+ //
+ // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address
+ //
+ qdrc_endpoint_bind_mobile_address_CT(core,
+ QD_TERMINUS_ADDRESS_LOOKUP,
+ '0', // phase
+ &_endpoint_handlers,
+ &_server_state);
+ *module_context = &_server_state;
+}
+
+
+static void _address_lookup_final_CT(void *module_context)
+{
+ _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
+ while (epr) {
+ DEQ_REMOVE_HEAD(_server_state.endpoints);
+ free__endpoint_ref_t(epr);
+ epr = DEQ_HEAD(_server_state.endpoints);
+ }
+}
+
+
+QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.h b/src/router_core/modules/address_lookup/address_lookup.h
new file mode 100644
index 0000000..9f8b65e
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.h
@@ -0,0 +1,75 @@
+#ifndef router_core_address_lookup_h
+#define router_core_address_lookup_h 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/compose.h>
+//
+// API for building address lookup request messages. The message properties
+// and body fields are handled separately so they can be passed directly to the
+// core client API.
+//
+
+typedef enum {
+ // note: keep unit test in sync
+ QCM_ADDR_LOOKUP_OK,
+ QCM_ADDR_LOOKUP_BAD_VERSION,
+ QCM_ADDR_LOOKUP_BAD_OPCODE,
+ QCM_ADDR_LOOKUP_NOT_FOUND,
+ QCM_ADDR_LOOKUP_INVALID_REQUEST,
+} qcm_address_lookup_status_t;
+
+
+/**
+ * Create the message properties and body for a link route address lookup. The
+ * returned properties and body can be passed directly to
+ * qdrc_client_request_CT().
+ *
+ * @param address - fully qualified link route address to look up.
+ * @param dir - QD_INCOMING or QD_OUTGOING
+ * @param properties - return value for message application properties section
+ * @param body - return value for message body
+ * @return zero on success
+ */
+int qcm_link_route_lookup_request(qd_iterator_t *address,
+ qd_direction_t dir,
+ qd_composed_field_t **properties,
+ qd_composed_field_t **body);
+
+
+/**
+ * Parse out the payload of the link route lookup reply message. The
+ * properties and body fields are provided by the on_reply_cb() callback passed
+ * to the qdrc_client_request_CT() call.
+ *
+ * @param properties - application properties as returned in the reply
+ * @param body - body from reply message
+ * @param is_link_route - set True if the address is a link route address that
+ * exists in the route tables of the queried router.
+ * @param has_destination - if is_link_route this indicates whether or not the
+ * queried router has active destinations for this link route.
+ * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+ qd_iterator_t *body,
+ bool *is_link_route,
+ bool *has_destinations);
+#endif // router_core_address_lookup_h
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 91d0016..0b70c97 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -857,7 +857,7 @@ void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg);
-
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control);
/**
* Links the in_dlv to the out_dlv and increments ref counts of both deliveries
*/
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6d9206d..1cce8de 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1061,27 +1061,36 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- qdr_field_t *addr_field = action->args.io.address;
- qd_message_t *msg = action->args.io.message;
-
if (!discard) {
- qdr_address_t *addr = 0;
-
- qd_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
- qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr);
- if (addr) {
- //
- // Forward the message. We don't care what the fanout count is.
- //
- (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess,
- action->args.io.control);
- addr->deliveries_from_container++;
- } else
- qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
+ qdr_in_process_send_to_CT(core,
+ qdr_field_iterator(action->args.io.address),
+ action->args.io.message,
+ action->args.io.exclude_inprocess,
+ action->args.io.control);
}
- qdr_field_free(addr_field);
- qd_message_free(msg);
+ qdr_field_free(action->args.io.address);
+ qd_message_free(action->args.io.message);
+}
+
+
+/**
+ * forward an in-process message based on the destination address
+ */
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control)
+{
+ qdr_address_t *addr = 0;
+
+ qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(core->addr_hash, address, (void**) &addr);
+ if (addr) {
+ //
+ // Forward the message. We don't care what the fanout count is.
+ //
+ (void) qdr_forward_message_CT(core, addr, msg, 0, exclude_inprocess, control);
+ addr->deliveries_from_container++;
+ } else
+ qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 51dc438..45206e8 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -123,6 +123,7 @@ foreach(py_test_module
${CONSOLE_TEST}
system_tests_priority
system_tests_core_client
+ system_tests_address_lookup
)
add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module})
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index d268671..d4cd362 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -436,11 +436,12 @@ class Qdrouterd(Process):
except:
return False
- def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs):
+ def wait_address(self, address, subscribers=0, remotes=0, containers=0, **retry_kwargs):
"""
Wait for an address to be visible on the router.
@keyword subscribers: Wait till subscriberCount >= subscribers
@keyword remotes: Wait till remoteCount >= remotes
+ @keyword containers: Wait till containerCount >= remotes
@param retry_kwargs: keyword args for L{retry}
"""
def check():
@@ -449,11 +450,13 @@ class Qdrouterd(Process):
# endswith check is because of M0/L/R prefixes
addrs = self.management.query(
type='org.apache.qpid.dispatch.router.address',
- attribute_names=[u'name', u'subscriberCount', u'remoteCount']).get_entities()
+ attribute_names=[u'name', u'subscriberCount', u'remoteCount', u'containerCount']).get_entities()
addrs = [a for a in addrs if a['name'].endswith(address)]
- return addrs and addrs[0]['subscriberCount'] >= subscribers and addrs[0]['remoteCount'] >= remotes
+ return (addrs and addrs[0]['subscriberCount'] >= subscribers
+ and addrs[0]['remoteCount'] >= remotes
+ and addrs[0]['containerCount'] >= containers)
assert retry(check, **retry_kwargs)
def get_host(self, protocol_family):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_tests_address_lookup.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_address_lookup.py b/tests/system_tests_address_lookup.py
new file mode 100644
index 0000000..3538f93
--- /dev/null
+++ b/tests/system_tests_address_lookup.py
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest2 as unittest
+import time
+
+from system_test import TestCase, Qdrouterd, TIMEOUT
+from system_test import AsyncTestReceiver
+from test_broker import FakeBroker
+
+from proton import Disposition
+from proton import Message
+from proton.utils import BlockingConnection
+from proton.utils import SyncRequestResponse
+from proton.utils import SendException
+from proton.utils import LinkDetached
+
+
+class LinkRouteLookupTest(TestCase):
+ """
+ Tests link route address lookup
+ """
+ # hardcoded values from the router's C code
+ QD_TERMINUS_ADDRESS_LOOKUP = '_$qd.addr_lookup'
+ PROTOCOL_VERSION = 1
+ OPCODE_LINK_ROUTE_LOOKUP = 1
+ QCM_ADDR_LOOKUP_OK = 0
+ QCM_ADDR_LOOKUP_NOT_FOUND = 3
+
+ def _check_response(self, message):
+ self.assertTrue(isinstance(message.properties, dict))
+ self.assertEqual(self.PROTOCOL_VERSION, message.properties.get('version'))
+ self.assertTrue(message.properties.get('opcode') is not None)
+ self.assertTrue(isinstance(message.body, list))
+ self.assertEqual(2, len(message.body))
+ return (message.properties.get('status'),
+ message.body[0], # is link_route?
+ message.body[1]) # has destinations?
+
+ @classmethod
+ def setUpClass(cls):
+ """Start a router"""
+ super(LinkRouteLookupTest, cls).setUpClass()
+
+ def router(name, mode, extra=None):
+ config = [
+ ('router', {'mode': mode, 'id': name}),
+ ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'})
+ ]
+
+ if extra:
+ config.extend(extra)
+ config = Qdrouterd.Config(config)
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+ cls.routers = []
+
+ inter_router_port = cls.tester.get_port()
+ edge_port_A = cls.tester.get_port()
+ broker_port_A = cls.tester.get_port()
+ broker_port_B = cls.tester.get_port()
+
+ router('INT.A', 'interior',
+ [
+ ('listener', {'role': 'edge',
+ 'port': cls.tester.get_port()}),
+ ('listener', {'role': 'inter-router',
+ 'port': inter_router_port}),
+ ('connector', {'role': 'route-container',
+ 'port': cls.tester.get_port()}),
+
+ ('linkRoute', {'pattern': 'org.apache.A.#',
+ 'containerId': 'FakeBrokerA',
+ 'direction': 'in'}),
+ ('linkRoute', {'pattern': 'org.apache.A.#',
+ 'containerId': 'FakeBrokerA',
+ 'direction': 'out'})
+ ])
+ cls.INT_A = cls.routers[-1]
+ cls.INT_A.listener = cls.INT_A.addresses[0]
+ cls.INT_A.edge_listener = cls.INT_A.addresses[1]
+ cls.INT_A.inter_router_listener = cls.INT_A.addresses[2]
+ cls.INT_A.broker_connector = cls.INT_A.connector_addresses[0]
+
+ router('INT.B', 'interior',
+ [
+ ('listener', {'role': 'edge',
+ 'port': cls.tester.get_port()}),
+ ('connector', {'role': 'inter-router',
+ 'name': 'connectorToA',
+ 'port': inter_router_port}),
+ ('connector', {'role': 'route-container',
+ 'port': cls.tester.get_port()}),
+
+ ('linkRoute', {'pattern': 'org.apache.B.#',
+ 'containerId': 'FakeBrokerB',
+ 'direction': 'in'}),
+ ('linkRoute', {'pattern': 'org.apache.B.#',
+ 'containerId': 'FakeBrokerB',
+ 'direction': 'out'})
+ ])
+ cls.INT_B = cls.routers[-1]
+ cls.INT_B.edge_listener = cls.INT_B.addresses[1]
+ cls.INT_B.broker_connector = cls.INT_B.connector_addresses[1]
+
+ cls.INT_A.wait_router_connected('INT.B')
+ cls.INT_B.wait_router_connected('INT.A')
+
+ def _lookup_request(self, lr_address, direction):
+ """
+ Construct a link route lookup request message
+ """
+ return Message(body=[lr_address,
+ direction],
+ properties={"version": self.PROTOCOL_VERSION,
+ "opcode": self.OPCODE_LINK_ROUTE_LOOKUP})
+
+ def test_link_route_lookup_ok(self):
+ """
+ verify a link route address can be looked up successfully for both
+ locally attached and remotely attached containers
+ """
+
+ # fire up a fake broker attached to the router local to the edge router
+ fb = FakeBroker(self.INT_A.broker_connector, container_id='FakeBrokerA')
+ self.INT_A.wait_address("org.apache.A", containers=1)
+
+ # create a fake edge and lookup the target address
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+ for direction in [True, False]:
+ # True = direction inbound (receiver) False = direction outbound (sender)
+ rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.foo", direction)))
+ self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+ self.assertTrue(rsp[1])
+ self.assertTrue(rsp[2])
+
+ # shutdown fake router
+ fb.join()
+
+ # now fire up a fake broker attached to the remote router
+ fb = FakeBroker(self.INT_B.broker_connector, container_id='FakeBrokerB')
+ self.INT_A.wait_address("org.apache.B", remotes=1)
+
+ for direction in [True, False]:
+ rsp = self._check_response(srr.call(self._lookup_request("org.apache.B.foo", direction)))
+ self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+ self.assertTrue(rsp[1])
+ self.assertTrue(rsp[2])
+
+ fb.join()
+ bc.close()
+
+ def test_link_route_lookup_not_found(self):
+ """
+ verify correct handling of lookup misses
+ """
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+ rsp = self._check_response(srr.call(self._lookup_request("not.found.address", True)))
+ self.assertEqual(self.QCM_ADDR_LOOKUP_NOT_FOUND, rsp[0])
+
+ def test_link_route_lookup_not_link_route(self):
+ """
+ verify correct handling of matches to mobile addresses
+ """
+ addr = "not.a.linkroute"
+ client = AsyncTestReceiver(self.INT_A.listener, addr)
+ self.INT_A.wait_address(addr)
+
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+ rsp = self._check_response(srr.call(self._lookup_request(addr, True)))
+ # self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+ self.assertEqual(False, rsp[1])
+ bc.close()
+ client.stop()
+
+ def test_link_route_lookup_no_dest(self):
+ """
+ verify correct handling of matches to mobile addresses
+ """
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+ rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.nope", True)))
+ self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+ self.assertEqual(True, rsp[1])
+ self.assertEqual(False, rsp[2])
+ bc.close()
+
+ def _invalid_request_test(self, msg, disposition=Disposition.REJECTED):
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+ # @TODO(kgiusti) - self.assertRaises does not work here:
+ try:
+ srr.call(msg)
+ self.assertTrue(False)
+ except SendException as exc:
+ self.assertEqual(disposition, exc.state)
+ bc.close()
+
+ def test_link_route_invalid_request(self):
+ """
+ Test various invalid message content
+ """
+
+ # empty message
+ self._invalid_request_test(Message())
+
+ # missing properties
+ msg = self._lookup_request("ignore", False)
+ msg.properties = None
+ self._invalid_request_test(msg)
+
+ # missing version
+ msg = self._lookup_request("ignore", False)
+ del msg.properties['version']
+ self._invalid_request_test(msg)
+
+ # invalid version
+ msg = self._lookup_request("ignore", False)
+ msg.properties['version'] = "blech"
+ self._invalid_request_test(msg)
+
+ # unsupported version
+ msg = self._lookup_request("ignore", False)
+ msg.properties['version'] = 97387187
+ self._invalid_request_test(msg)
+
+ # missing opcode
+ msg = self._lookup_request("ignore", False)
+ del msg.properties['opcode']
+ self._invalid_request_test(msg)
+
+ # bad opcode
+ msg = self._lookup_request("ignore", False)
+ msg.properties['opcode'] = "snarf"
+ self._invalid_request_test(msg)
+
+ # bad body
+ msg = self._lookup_request("ignore", False)
+ msg.body = [71]
+ self._invalid_request_test(msg)
+
+ def test_lookup_bad_connection(self):
+ """
+ Verify that clients connected via non-edge connections fail
+ """
+ bc = BlockingConnection(self.INT_A.listener, timeout=TIMEOUT)
+ with self.assertRaises(LinkDetached):
+ SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+ bc.close()
+
+ bc = BlockingConnection(self.INT_A.inter_router_listener, timeout=TIMEOUT)
+ with self.assertRaises(LinkDetached):
+ SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+ bc.close()
+
+ # consuming from the lookup address is forbidden:
+ bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+ with self.assertRaises(LinkDetached):
+ rcv = bc.create_receiver(self.QD_TERMINUS_ADDRESS_LOOKUP)
+ bc.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-1194 - Refactored the
address lookup for attach into a core module with asynchronous capability.
Posted by tr...@apache.org.
DISPATCH-1194 - Refactored the address lookup for attach into a core module with asynchronous capability.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/564c5907
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/564c5907
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/564c5907
Branch: refs/heads/master
Commit: 564c5907cebb12653181f2e266dce8b581dc7c32
Parents: d201dea
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 4 09:38:09 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 4 09:38:09 2018 -0500
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/router_core/connections.c | 345 +----------------
src/router_core/core_attach_address_lookup.h | 54 +++
.../address_lookup_client/lookup_client.c | 367 +++++++++++++++++++
src/router_core/router_core_private.h | 3 +
5 files changed, 433 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 352aec7..8fc6485 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -110,6 +110,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/test_hooks/core_test_hooks.c
router_core/modules/edge_addr_tracking/edge_addr_tracking.c
router_core/modules/address_lookup/address_lookup.c
+ router_core/modules/address_lookup_client/lookup_client.c
router_node.c
router_pynode.c
schema_enum.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index db271f4..d06bec9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -607,33 +607,6 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core,
/**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- */
-static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
-{
- char discriminator[QD_DISCRIMINATOR_SIZE];
- qd_generate_discriminator(discriminator);
- if (core->router_mode == QD_ROUTER_MODE_EDGE)
- snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
- else
- snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
-}
-
-
-/**
- * Generate a temporary mobile address for a producer connected to this
- * router node.
- */
-static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
-{
- char discriminator[QD_DISCRIMINATOR_SIZE];
- qd_generate_discriminator(discriminator);
- snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
-}
-
-
-/**
* Generate a link name
*/
static void qdr_generate_link_name(const char *label, char *buffer, size_t length)
@@ -1087,173 +1060,6 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr)
}
-/**
- * qdr_lookup_terminus_address_CT
- *
- * Lookup a terminus address in the route table and possibly create a new address
- * if no match is found.
- *
- * @param core Pointer to the core object
- * @param dir Direction of the link for the terminus
- * @param conn The connection over which the terminus was attached
- * @param terminus The terminus containing the addressing information to be looked up
- * @param create_if_not_found Iff true, return a pointer to a newly created address record
- * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
- * @param [out] link_route True iff the lookup indicates that an attach should be routed
- * @param [out] unavailable True iff this address is blocked as unavailable
- * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
- * @return Pointer to an address record or 0 if none is found
- */
-static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
- qd_direction_t dir,
- qdr_connection_t *conn,
- qdr_terminus_t *terminus,
- bool create_if_not_found,
- bool accept_dynamic,
- bool *link_route,
- bool *unavailable,
- bool *core_endpoint)
-{
- qdr_address_t *addr = 0;
-
- //
- // Unless expressly stated, link routing is not indicated for this terminus.
- //
- *link_route = false;
- *unavailable = false;
- *core_endpoint = false;
-
- if (qdr_terminus_is_dynamic(terminus)) {
- //
- // The terminus is dynamic. Check to see if there is an address provided
- // in the dynamic node properties. If so, look that address up as a link-routed
- // destination.
- //
- qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
- if (dnp_address) {
- qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
- if (conn->tenant_space)
- qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
- qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
-
- if (addr && conn->tenant_space) {
- //
- // If this link is in a tenant space, translate the dnp address to
- // the fully-qualified view
- //
- qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
- }
-
- qd_iterator_free(dnp_address);
- *link_route = true;
- return addr;
- }
-
- //
- // The dynamic terminus has no address in the dynamic-node-propteries. If we are
- // permitted to generate dynamic addresses, create a new address that is local to
- // this router and insert it into the address table with a hash index.
- //
- if (!accept_dynamic)
- return 0;
-
- char temp_addr[200];
- bool generating = true;
- while (generating) {
- //
- // The address-generation process is performed in a loop in case the generated
- // address collides with a previously generated address (this should be _highly_
- // unlikely).
- //
- if (dir == QD_OUTGOING)
- qdr_generate_temp_addr(core, temp_addr, 200);
- else
- qdr_generate_mobile_addr(core, temp_addr, 200);
-
- qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
- qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
- if (!addr) {
- addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
- qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
- qdr_terminus_set_address(terminus, temp_addr);
- generating = false;
- }
- qd_iterator_free(temp_iter);
- }
- return addr;
- }
-
- //
- // If the terminus is anonymous, there is no address to look up.
- //
- if (qdr_terminus_is_anonymous(terminus))
- return 0;
-
- //
- // The terminus has a non-dynamic address that we need to look up. First, look for
- // a link-route destination for the address.
- //
- qd_iterator_t *iter = qdr_terminus_get_address(terminus);
- qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
- if (conn->tenant_space)
- qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
- qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
- if (addr) {
- *link_route = true;
-
- //
- // If this link is in a tenant space, translate the terminus address to
- // the fully-qualified view
- //
- if (conn->tenant_space) {
- qdr_terminus_set_address_iterator(terminus, iter);
- }
- return addr;
- }
-
- //
- // There was no match for a link-route destination, look for a message-route address.
- //
- int in_phase;
- int out_phase;
- int addr_phase;
- int priority;
- qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
-
- qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
- addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
- qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
-
- qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
-
- if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
- *unavailable = true;
-
- if (!addr && create_if_not_found) {
- addr = qdr_address_CT(core, treat);
- if (addr) {
- qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(core->addrs, addr);
- }
-
- if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
- *unavailable = true;
- }
-
- if (qdr_terminus_is_coordinator(terminus))
- *unavailable = false;
-
- if (!!addr && addr->core_endpoint != 0)
- *core_endpoint = true;
-
- if (addr)
- addr->priority = priority;
- return addr;
-}
-
-
static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
@@ -1412,19 +1218,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
}
-static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
-{
- size_t olen = strlen(original);
- size_t clen = strlen(conn->container);
- char *name = (char*) malloc(olen + clen + 2);
- memset(name, 0, olen + clen + 2);
- strcat(name, original);
- name[olen] = '@';
- strcat(name + olen + 1, conn->container);
- return name;
-}
-
-
//
// Handle the attachment and detachment of an inter-router control link
//
@@ -1508,7 +1301,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
qd_direction_t dir = action->args.connection.dir;
qdr_terminus_t *source = action->args.connection.source;
qdr_terminus_t *target = action->args.connection.target;
- bool success;
//
// Put the link into the proper lists for tracking.
@@ -1561,93 +1353,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
// This link has a target address
//
- bool link_route;
- bool unavailable;
- bool core_endpoint;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint);
-
- if (core_endpoint) {
- qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
- }
-
- else if (unavailable) {
- qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
-
- else if (!addr) {
- //
- // No route to this destination, reject the link
- //
+ if (core->addr_lookup_handler)
+ core->addr_lookup_handler(core, conn, link, dir, source, target);
+ else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
- }
-
- else if (link_route) {
- //
- // This is a link-routed destination, forward the attach to the next hop
- //
- if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- } else {
- if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
- link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
- }
- success = qdr_forward_attach_CT(core, addr, link, source, target);
-
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
- }
-
- }
-
- else if (qdr_terminus_is_coordinator(target)) {
- //
- // This target terminus is a coordinator.
- // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
- // The router should reject this link because the router cannot coordinate transactions itself.
- //
- // The attach response should have a null target to indicate refusal and the immediately coming detach.
- qdr_link_outbound_second_attach_CT(core, link, source, 0);
- // Now, send back a detach with the error amqp:precondition-failed
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
- }
- else {
- //
- // Associate the link with the address. With this association, it will be unnecessary
- // to do an address lookup for deliveries that arrive on this link.
- //
- qdr_core_bind_address_link_CT(core, addr, link);
- qdr_link_outbound_second_attach_CT(core, link, source, target);
-
- //
- // Issue the initial credit only if one of the following
- // holds:
- // - there are destinations for the address
- // - if the address treatment is multicast
- // - the address is that of an exchange (no subscribers allowed)
- //
- if (DEQ_SIZE(addr->subscriptions)
- || DEQ_SIZE(addr->rlinks)
- || qd_bitmask_cardinality(addr->rnodes)
- || qdr_is_addr_treatment_multicast(addr)
- || !!addr->exchange) {
- qdr_link_issue_credit_CT(core, link, link->capacity, false);
- }
-
- //
- // If this link came through an edge connection, raise a link event to
- // herald that fact.
- //
- if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
- qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+ return;
}
}
break;
@@ -1668,54 +1380,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
//
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
- bool link_route;
- bool unavailable;
- bool core_endpoint;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint);
-
- if (core_endpoint) {
- qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
- }
-
- else if (unavailable) {
- qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
-
- else if (!addr) {
- //
- // No route to this destination, reject the link
- //
+ if (core->addr_lookup_handler)
+ core->addr_lookup_handler(core, conn, link, dir, source, target);
+ else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
- }
-
- else if (link_route) {
- //
- // This is a link-routed destination, forward the attach to the next hop
- //
- if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- } else {
- if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
- link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
- }
- bool success = qdr_forward_attach_CT(core, addr, link, source, target);
- if (!success) {
- qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
- qdr_terminus_free(source);
- qdr_terminus_free(target);
- }
- }
- }
-
- else {
- qdr_core_bind_address_link_CT(core, addr, link);
- qdr_link_outbound_second_attach_CT(core, link, source, target);
+ return;
}
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/core_attach_address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h
new file mode 100644
index 0000000..d05b9cb
--- /dev/null
+++ b/src/router_core/core_attach_address_lookup.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_attach_addr_lookup_types
+#define qd_router_core_attach_addr_lookup_types 1
+
+#include "router_core_private.h"
+#endif
+
+#ifndef qd_router_core_attach_addr_lookup
+#define qd_router_core_attach_addr_lookup 1
+
+
+/**
+ * Handler - Look up the address on a received first-attach
+ *
+ * This handler is invoked upon receipt of a first-attach on a normal endpoint link.
+ * The appropriate address, from source or target will be resolved to and address for
+ * message or link routing. This operation may be synchronoue (completed before it
+ * returns) or asynchronous (completed later).
+ *
+ * @param core Pointer to the core state.
+ * @param conn Pointer to the connection over which the attach arrived.
+ * @param link Pointer to the attaching link.
+ * @param dir The direction of message flow for the link.
+ * @param source The source terminus for the attach.
+ * @param target The target terminus for the attach.
+ */
+typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target);
+
+
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
new file mode 100644
index 0000000..924565e
--- /dev/null
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "module.h"
+#include "core_attach_address_lookup.h"
+#include "router_core_private.h"
+#include <qpid/dispatch/discriminator.h>
+#include <stdio.h>
+
+static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
+{
+ size_t olen = strlen(original);
+ size_t clen = strlen(conn->container);
+ char *name = (char*) malloc(olen + clen + 2);
+ memset(name, 0, olen + clen + 2);
+ strcat(name, original);
+ name[olen] = '@';
+ strcat(name + olen + 1, conn->container);
+ return name;
+}
+
+
+/**
+ * Generate a temporary routable address for a destination connected to this
+ * router node.
+ */
+static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+ char discriminator[QD_DISCRIMINATOR_SIZE];
+ qd_generate_discriminator(discriminator);
+ if (core->router_mode == QD_ROUTER_MODE_EDGE)
+ snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
+ else
+ snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
+}
+
+
+/**
+ * Generate a temporary mobile address for a producer connected to this
+ * router node.
+ */
+static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+ char discriminator[QD_DISCRIMINATOR_SIZE];
+ qd_generate_discriminator(discriminator);
+ snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
+}
+
+
+/**
+ * qdr_lookup_terminus_address_CT
+ *
+ * Lookup a terminus address in the route table and possibly create a new address
+ * if no match is found.
+ *
+ * @param core Pointer to the core object
+ * @param dir Direction of the link for the terminus
+ * @param conn The connection over which the terminus was attached
+ * @param terminus The terminus containing the addressing information to be looked up
+ * @param create_if_not_found Iff true, return a pointer to a newly created address record
+ * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
+ * @param [out] link_route True iff the lookup indicates that an attach should be routed
+ * @param [out] unavailable True iff this address is blocked as unavailable
+ * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
+ * @return Pointer to an address record or 0 if none is found
+ */
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
+ qd_direction_t dir,
+ qdr_connection_t *conn,
+ qdr_terminus_t *terminus,
+ bool create_if_not_found,
+ bool accept_dynamic,
+ bool *link_route,
+ bool *unavailable,
+ bool *core_endpoint)
+{
+ qdr_address_t *addr = 0;
+
+ //
+ // Unless expressly stated, link routing is not indicated for this terminus.
+ //
+ *link_route = false;
+ *unavailable = false;
+ *core_endpoint = false;
+
+ if (qdr_terminus_is_dynamic(terminus)) {
+ //
+ // The terminus is dynamic. Check to see if there is an address provided
+ // in the dynamic node properties. If so, look that address up as a link-routed
+ // destination.
+ //
+ qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
+ if (dnp_address) {
+ qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
+ qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
+
+ if (addr && conn->tenant_space) {
+ //
+ // If this link is in a tenant space, translate the dnp address to
+ // the fully-qualified view
+ //
+ qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
+ }
+
+ qd_iterator_free(dnp_address);
+ *link_route = true;
+ return addr;
+ }
+
+ //
+ // The dynamic terminus has no address in the dynamic-node-propteries. If we are
+ // permitted to generate dynamic addresses, create a new address that is local to
+ // this router and insert it into the address table with a hash index.
+ //
+ if (!accept_dynamic)
+ return 0;
+
+ char temp_addr[200];
+ bool generating = true;
+ while (generating) {
+ //
+ // The address-generation process is performed in a loop in case the generated
+ // address collides with a previously generated address (this should be _highly_
+ // unlikely).
+ //
+ if (dir == QD_OUTGOING)
+ qdr_generate_temp_addr(core, temp_addr, 200);
+ else
+ qdr_generate_mobile_addr(core, temp_addr, 200);
+
+ qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
+ if (!addr) {
+ addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
+ qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ qdr_terminus_set_address(terminus, temp_addr);
+ generating = false;
+ }
+ qd_iterator_free(temp_iter);
+ }
+ return addr;
+ }
+
+ //
+ // If the terminus is anonymous, there is no address to look up.
+ //
+ if (qdr_terminus_is_anonymous(terminus))
+ return 0;
+
+ //
+ // The terminus has a non-dynamic address that we need to look up. First, look for
+ // a link-route destination for the address.
+ //
+ qd_iterator_t *iter = qdr_terminus_get_address(terminus);
+ qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+ if (conn->tenant_space)
+ qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
+ qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
+ if (addr) {
+ *link_route = true;
+
+ //
+ // If this link is in a tenant space, translate the terminus address to
+ // the fully-qualified view
+ //
+ if (conn->tenant_space) {
+ qdr_terminus_set_address_iterator(terminus, iter);
+ }
+ return addr;
+ }
+
+ //
+ // There was no match for a link-route destination, look for a message-route address.
+ //
+ int in_phase;
+ int out_phase;
+ int addr_phase;
+ int priority;
+ qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
+
+ qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
+ addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
+ qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
+
+ qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+ if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
+ *unavailable = true;
+
+ if (!addr && create_if_not_found) {
+ addr = qdr_address_CT(core, treat);
+ if (addr) {
+ qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ }
+
+ if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
+ *unavailable = true;
+ }
+
+ if (qdr_terminus_is_coordinator(terminus))
+ *unavailable = false;
+
+ if (!!addr && addr->core_endpoint != 0)
+ *core_endpoint = true;
+
+ if (addr)
+ addr->priority = priority;
+ return addr;
+}
+
+
+static void qdr_link_react_to_first_attach_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_address_t *addr,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target,
+ bool link_route,
+ bool unavailable,
+ bool core_endpoint)
+{
+ if (core_endpoint) {
+ qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
+ }
+
+ else if (unavailable) {
+ qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+
+ else if (!addr) {
+ //
+ // No route to this destination, reject the link
+ //
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+
+ else if (link_route) {
+ //
+ // This is a link-routed destination, forward the attach to the next hop
+ //
+ qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+ if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ } else {
+ if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+ link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+ }
+ bool success = qdr_forward_attach_CT(core, addr, link, source, target);
+ if (!success) {
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+ qdr_terminus_free(source);
+ qdr_terminus_free(target);
+ }
+ }
+ }
+
+ else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) {
+ //
+ // This target terminus is a coordinator.
+ // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
+ // The router should reject this link because the router cannot coordinate transactions itself.
+ //
+ // The attach response should have a null target to indicate refusal and the immediately coming detach.
+ //
+ qdr_link_outbound_second_attach_CT(core, link, source, 0);
+
+ //
+ // Now, send back a detach with the error amqp:precondition-failed
+ //
+ qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
+ } else {
+ //
+ // Associate the link with the address. With this association, it will be unnecessary
+ // to do an address lookup for deliveries that arrive on this link.
+ //
+ qdr_core_bind_address_link_CT(core, addr, link);
+ qdr_link_outbound_second_attach_CT(core, link, source, target);
+
+ //
+ // Issue the initial credit only if one of the following
+ // holds:
+ // - there are destinations for the address
+ // - if the address treatment is multicast
+ // - the address is that of an exchange (no subscribers allowed)
+ //
+ if (dir == QD_INCOMING
+ && (DEQ_SIZE(addr->subscriptions)
+ || DEQ_SIZE(addr->rlinks)
+ || qd_bitmask_cardinality(addr->rnodes)
+ || qdr_is_addr_treatment_multicast(addr)
+ || !!addr->exchange)) {
+ qdr_link_issue_credit_CT(core, link, link->capacity, false);
+ }
+
+ //
+ // If this link came through an edge connection, raise a link event to
+ // herald that fact.
+ //
+ if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
+ qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+ }
+}
+
+
+static void qcm_addr_lookup_CT(qdr_core_t *core,
+ qdr_connection_t *conn,
+ qdr_link_t *link,
+ qd_direction_t dir,
+ qdr_terminus_t *source,
+ qdr_terminus_t *target)
+
+{
+ bool link_route;
+ bool unavailable;
+ bool core_endpoint;
+ qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
+ qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+}
+
+
+static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context)
+{
+ assert(core->addr_lookup_handler == 0);
+
+ core->addr_lookup_handler = qcm_addr_lookup_CT;
+ *module_context = core;
+}
+
+
+static void qcm_addr_lookup_client_final_CT(void *module_context)
+{
+ qdr_core_t *core = (qdr_core_t*) module_context;
+ core->addr_lookup_handler = 0;
+}
+
+
+QDR_CORE_MODULE_DECLARE("address_lookup_client", qcm_addr_lookup_client_init_CT, qcm_addr_lookup_client_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0b70c97..bb4af57 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -43,6 +43,7 @@ typedef struct qdr_edge_t qdr_edge_t;
#include "core_link_endpoint.h"
#include "core_events.h"
+#include "core_attach_address_lookup.h"
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -727,6 +728,8 @@ struct qdr_core_t {
qdr_connection_list_t connections_to_activate;
qdr_link_list_t open_links;
+ qdrc_attach_addr_lookup_t addr_lookup_handler;
+
//
// Agent section
//
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org