You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/04 14:51:50 UTC

[1/2] qpid-dispatch git commit: DISPATCH-1194: Link Route address lookup for edge router This closes #423

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master db87d5d3e -> 564c5907c


DISPATCH-1194: Link Route address lookup for edge router
This closes #423


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d201deac
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d201deac
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d201deac

Branch: refs/heads/master
Commit: d201deac4ec8854571dcd3acd1aa5aa39d16de5b
Parents: db87d5d
Author: Kenneth Giusti <kg...@apache.org>
Authored: Wed Nov 28 09:30:09 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 4 09:34:47 2018 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/amqp.h                    |   5 +
 src/CMakeLists.txt                              |   1 +
 .../modules/address_lookup/address_lookup.c     | 454 +++++++++++++++++++
 .../modules/address_lookup/address_lookup.h     |  75 +++
 src/router_core/router_core_private.h           |   2 +-
 src/router_core/transfer.c                      |  45 +-
 tests/CMakeLists.txt                            |   1 +
 tests/system_test.py                            |   9 +-
 tests/system_tests_address_lookup.py            | 287 ++++++++++++
 9 files changed, 857 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index e62b7f7..99315c7 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -184,5 +184,10 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
 extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
 /// @};
 
+/** @name AMQP link endpoint role. */
+/// @{
+#define QD_AMQP_LINK_ROLE_SENDER   false
+#define QD_AMQP_LINK_ROLE_RECEIVER true
+/// @};
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ebc0e09..352aec7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -109,6 +109,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/edge_router/edge_mgmt.c
   router_core/modules/test_hooks/core_test_hooks.c
   router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+  router_core/modules/address_lookup/address_lookup.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.c b/src/router_core/modules/address_lookup/address_lookup.c
new file mode 100644
index 0000000..93297ac
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.c
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "address_lookup.h"
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+
+#include "inttypes.h"
+
+#define PROTOCOL_VERSION 1
+
+typedef enum {
+    // note: keep unit test in sync
+    OPCODE_INVALID,
+    OPCODE_LINK_ROUTE_LOOKUP,
+} address_lookup_opcode_t;
+
+
+/* create the message application properties and body for the link route lookup
+ * request message
+ */
+int qcm_link_route_lookup_msg(qd_iterator_t        *address,
+                              qd_direction_t        dir,
+                              qd_composed_field_t **properties,
+                              qd_composed_field_t **body)
+{
+    *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+    if (!*properties)
+        return -1;
+    qd_compose_start_map(*properties);
+    qd_compose_insert_string(*properties, "version");
+    qd_compose_insert_uint(*properties,   PROTOCOL_VERSION);
+    qd_compose_insert_string(*properties, "opcode");
+    qd_compose_insert_uint(*properties,   OPCODE_LINK_ROUTE_LOOKUP);
+    qd_compose_end_map(*properties);
+
+    *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    if (!*body) {
+        qd_compose_free(*properties);
+        *properties = 0;
+        return -1;
+    }
+    qd_compose_start_list(*body);
+    qd_compose_insert_string_iterator(*body, address);
+    qd_compose_insert_bool(*body, (dir == QD_INCOMING
+                                   ? QD_AMQP_LINK_ROLE_RECEIVER
+                                   : QD_AMQP_LINK_ROLE_SENDER));
+    qd_compose_end_list(*body);
+    return 0;
+}
+
+
+/* parse a reply to the link route lookup request
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+                                                         qd_iterator_t *body,
+                                                         bool          *is_link_route,
+                                                         bool          *has_destinations)
+{
+    qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK;
+    *is_link_route = false;
+    *has_destinations = false;
+
+    qd_parsed_field_t *props = qd_parse(properties);
+    if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props))
+        return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+
+    qd_parsed_field_t *bod = qd_parse(body);
+    if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) {
+        qd_parse_free(props);
+        return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+    }
+
+    qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status");
+    if (!tmp || !qd_parse_is_scalar(tmp)) {
+        rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+        goto exit;
+    } else {
+        int32_t status = qd_parse_as_int(tmp);
+        if (status != QCM_ADDR_LOOKUP_OK) {
+            rc = (qcm_address_lookup_status_t) status;
+            goto exit;
+        }
+    }
+
+    // bod[0] == is_link_route (bool)
+    // bod[1] == has_destinations (bool)
+
+    if (qd_parse_sub_count(bod) < 2) {
+        rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+        goto exit;
+    }
+
+    *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0));
+    *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1));
+
+exit:
+    qd_parse_free(props);
+    qd_parse_free(bod);
+    return rc;
+}
+
+
+typedef struct _endpoint_ref {
+    DEQ_LINKS(struct _endpoint_ref);
+    qdrc_endpoint_t *endpoint;
+    const char *container_id;
+} _endpoint_ref_t;
+DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t);
+ALLOC_DEFINE(_endpoint_ref_t);
+
+
+static struct {
+    qdr_core_t           *core;
+    _endpoint_ref_list_t  endpoints;
+} _server_state;
+
+
+/* parse out the opcode from the request
+ */
+static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties)
+{
+    if (!properties)
+        return OPCODE_INVALID;
+    qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode");
+    if (!oc)
+        return OPCODE_INVALID;
+    uint32_t opcode = qd_parse_as_uint(oc);
+    if (!qd_parse_ok(oc))
+        return OPCODE_INVALID;
+    return (address_lookup_opcode_t)opcode;
+}
+
+
+/* send a reply to a lookup request
+ */
+static uint64_t _send_reply(_endpoint_ref_t             *epr,
+                            address_lookup_opcode_t      opcode,
+                            qcm_address_lookup_status_t  status,
+                            qd_iterator_t               *correlation_id,
+                            qd_iterator_t               *reply_to,
+                            qd_composed_field_t         *body)
+{
+    if (!correlation_id || !reply_to) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Link route address reply failed - invalid request message properties"
+               " (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        return PN_REJECTED;
+    }
+
+    qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(fld);
+    qd_compose_insert_bool(fld, 0);     // durable
+    qd_compose_end_list(fld);
+
+    fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
+    qd_compose_start_list(fld);
+    qd_compose_insert_null(fld);                    // message-id
+    qd_compose_insert_null(fld);                    // user-id
+    qd_compose_insert_typed_iterator(fld, reply_to); // to
+    qd_compose_insert_null(fld);                    // subject
+    qd_compose_insert_null(fld);                    // reply-to
+    qd_compose_insert_typed_iterator(fld, correlation_id);
+    qd_compose_end_list(fld);
+
+    fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld);
+    qd_compose_start_map(fld);
+    qd_compose_insert_string(fld, "version");
+    qd_compose_insert_uint(fld,   PROTOCOL_VERSION);
+    qd_compose_insert_string(fld, "opcode");
+    qd_compose_insert_uint(fld,   opcode);
+    qd_compose_insert_string(fld, "status");
+    qd_compose_insert_uint(fld,   status);
+    qd_compose_end_map(fld);
+
+    qd_message_t *msg = qd_message();
+
+    qd_message_compose_3(msg, fld, body);
+    qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
+    qd_message_free(msg);
+    qd_compose_free(fld);
+
+    return PN_ACCEPTED;
+}
+
+
+/* perform a link route lookup
+ */
+static uint64_t _do_link_route_lookup(_endpoint_ref_t   *epr,
+                                      qd_parsed_field_t *body,
+                                      qd_iterator_t     *reply_to,
+                                      qd_iterator_t     *cid)
+{
+    if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Link route address lookup failed - invalid request body"
+               " (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        return PN_REJECTED;
+    }
+
+    //
+    // body[0] == fully qualified address (string)
+    // body[1] == direction (bool, true == receiver)
+    //
+
+    qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0));
+    qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1))
+                          ? QD_INCOMING : QD_OUTGOING);
+
+    bool is_link_route = false;
+    bool has_destinations = false;
+    qdr_address_t *addr = 0;
+    qd_iterator_reset_view(addr_i, ITER_VIEW_ALL);
+    qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr);
+    if (addr) {
+        is_link_route = true;
+        has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes));
+    }
+
+    // out_body[0] == is_link_route (bool)
+    // out_body[1] == has_destinations (bool)
+
+    qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    qd_compose_start_list(out_body);
+    qd_compose_insert_bool(out_body, is_link_route);
+    qd_compose_insert_bool(out_body, has_destinations);
+    qd_compose_end_list(out_body);
+
+    uint64_t rc = _send_reply(epr,
+                              OPCODE_LINK_ROUTE_LOOKUP,
+                              addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND,
+                              cid,
+                              reply_to,
+                              out_body);
+    qd_compose_free(out_body);
+
+    if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
+        char *as = (char *)qd_iterator_copy(addr_i);
+        qd_log(_server_state.core->log, QD_LOG_TRACE,
+               "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s"
+               " (container=%s, endpoint=%p)",
+               as,
+               (addr) ? "" : "not ",
+               is_link_route ? "yes" : "no",
+               has_destinations ? "yes" : "no",
+               epr->container_id,
+               (void *)epr->endpoint);
+        free(as);
+    }
+    return rc;
+}
+
+
+/* handle lookup request from client
+ */
+void _on_transfer(void           *link_context,
+                  qdr_delivery_t *delivery,
+                  qd_message_t   *message)
+{
+    if (!qd_message_receive_complete(message))
+        return;
+
+    _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Address lookup request received (container=%s, endpoint=%p)",
+           epr->container_id, (void *)epr->endpoint);
+
+    uint64_t disposition = PN_ACCEPTED;
+    qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
+    qd_parsed_field_t *props = qd_parse(p_iter);
+    if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - no properties (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    qd_parsed_field_t *v = qd_parse_value_by_key(props, "version");
+    if (!v) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - no version (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    uint32_t version = qd_parse_as_uint(v);
+    if (!qd_parse_ok(v)) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - invalid version (container=%s, endpoint=%p)",
+               epr->container_id, (void *)epr->endpoint);
+        disposition = PN_REJECTED;
+        goto exit;
+    }
+
+    if (version != PROTOCOL_VERSION) {
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - unknown version"
+               " (container=%s, endpoint=%p, version=%"PRIu32")",
+               epr->container_id, (void *)epr->endpoint, version);
+        disposition = PN_REJECTED;
+        goto exit;
+        // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION
+    }
+
+    address_lookup_opcode_t opcode = _decode_opcode(props);
+    switch (opcode) {
+    case OPCODE_LINK_ROUTE_LOOKUP: {
+        qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY);
+        qd_parsed_field_t *body = qd_parse(b_iter);
+        qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO);
+        qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID);
+        disposition = _do_link_route_lookup(epr, body, reply_to, cid);
+        qd_iterator_free(cid);
+        qd_iterator_free(reply_to);
+        qd_parse_free(body);
+        qd_iterator_free(b_iter);
+        break;
+    }
+    case OPCODE_INVALID:
+    default:
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Invalid address lookup request - invalid opcode"
+               " (container=%s, endpoint=%p, opcode=%d)",
+               epr->container_id, (void *)epr->endpoint, opcode);
+        disposition = PN_REJECTED;
+    }
+
+exit:
+    qd_parse_free(props);
+    qd_iterator_free(p_iter);
+    qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition);
+    qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
+    return;
+}
+
+
+/* handle incoming attach to address lookup service
+ */
+static void _on_first_attach(void            *bind_context,
+                             qdrc_endpoint_t *endpoint,
+                             void            **link_context,
+                             qdr_terminus_t  *remote_source,
+                             qdr_terminus_t  *remote_target)
+{
+    //
+    // Only accept incoming links initiated by the edge router. Detach all
+    // other links
+    //
+    qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint);
+    if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING ||
+        conn->role != QDR_ROLE_EDGE_CONNECTION) {
+        *link_context = 0;
+        qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0);
+        qd_log(_server_state.core->log, QD_LOG_ERROR,
+               "Attempt to attach to address lookup server rejected (container=%s)",
+               (conn->connection_info) ? conn->connection_info->container : "<unknown>");
+        return;
+    }
+
+    _endpoint_ref_t *epr = new__endpoint_ref_t();
+    ZERO(epr);
+    epr->endpoint = endpoint;
+    epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>";
+    DEQ_INSERT_TAIL(_server_state.endpoints, epr);
+    *link_context = epr;
+    qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target);
+    qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false);
+
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Client attached to address lookup server (container=%s, endpoint=%p)",
+           epr->container_id, (void *)endpoint);
+}
+
+
+/* handle incoming detach from client
+ */
+static void _on_first_detach(void *link_context,
+                             qdr_error_t *error)
+{
+    _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+    qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0);
+    DEQ_REMOVE(_server_state.endpoints, epr);
+    qd_log(_server_state.core->log, QD_LOG_TRACE,
+           "Client detached from address lookup server (container=%s, endpoint=%p)",
+           epr->container_id, (void *)epr->endpoint);
+    free__endpoint_ref_t(epr);
+}
+
+
+static qdrc_endpoint_desc_t _endpoint_handlers =
+{
+    .label = "address lookup",
+    .on_first_attach = _on_first_attach,
+    .on_transfer = _on_transfer,
+    .on_first_detach = _on_first_detach,
+};
+
+
+static void _address_lookup_init_CT(qdr_core_t *core, void **module_context)
+{
+    //
+    // Address resolution service is provided by interior routers only
+    //
+    if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+        return;
+
+    _server_state.core = core;
+
+    //
+    // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address
+    //
+    qdrc_endpoint_bind_mobile_address_CT(core,
+                                         QD_TERMINUS_ADDRESS_LOOKUP,
+                                         '0', // phase
+                                         &_endpoint_handlers,
+                                         &_server_state);
+    *module_context = &_server_state;
+}
+
+
+static void _address_lookup_final_CT(void *module_context)
+{
+    _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
+    while (epr) {
+        DEQ_REMOVE_HEAD(_server_state.endpoints);
+        free__endpoint_ref_t(epr);
+        epr = DEQ_HEAD(_server_state.endpoints);
+    }
+}
+
+
+QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/modules/address_lookup/address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.h b/src/router_core/modules/address_lookup/address_lookup.h
new file mode 100644
index 0000000..9f8b65e
--- /dev/null
+++ b/src/router_core/modules/address_lookup/address_lookup.h
@@ -0,0 +1,75 @@
+#ifndef router_core_address_lookup_h
+#define router_core_address_lookup_h 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/compose.h>
+//
+// API for building address lookup request messages.  The message properties
+// and body fields are handled separately so they can be passed directly to the
+// core client API.
+//
+
+typedef enum {
+    // note: keep unit test in sync
+    QCM_ADDR_LOOKUP_OK,
+    QCM_ADDR_LOOKUP_BAD_VERSION,
+    QCM_ADDR_LOOKUP_BAD_OPCODE,
+    QCM_ADDR_LOOKUP_NOT_FOUND,
+    QCM_ADDR_LOOKUP_INVALID_REQUEST,
+} qcm_address_lookup_status_t;
+
+
+/**
+ * Create the message properties and body for a link route address lookup.  The
+ * returned properties and body can be passed directly to
+ * qdrc_client_request_CT().
+ *
+ * @param address - fully qualified link route address to look up.
+ * @param dir - QD_INCOMING or QD_OUTGOING
+ * @param properties - return value for message application properties section
+ * @param body - return value for message body
+ * @return zero on success
+ */
+int qcm_link_route_lookup_request(qd_iterator_t        *address,
+                                  qd_direction_t        dir,
+                                  qd_composed_field_t **properties,
+                                  qd_composed_field_t **body);
+
+
+/**
+ * Parse out the payload of the link route lookup reply message.  The
+ * properties and body fields are provided by the on_reply_cb() callback passed
+ * to the qdrc_client_request_CT() call.
+ *
+ * @param properties - application properties as returned in the reply
+ * @param body - body from reply message
+ * @param is_link_route - set True if the address is a link route address that
+ *        exists in the route tables of the queried router.
+ * @param has_destination - if is_link_route this indicates whether or not the
+ *        queried router has active destinations for this link route.
+ * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+                                                         qd_iterator_t *body,
+                                                         bool          *is_link_route,
+                                                         bool          *has_destinations);
+#endif // router_core_address_lookup_h

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 91d0016..0b70c97 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -857,7 +857,7 @@ void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery, const char *label);
 void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg);
-
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control);
 /**
  * Links the in_dlv to the out_dlv and increments ref counts of both deliveries
  */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6d9206d..1cce8de 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1061,27 +1061,36 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
 
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
-    qdr_field_t  *addr_field = action->args.io.address;
-    qd_message_t *msg        = action->args.io.message;
-
     if (!discard) {
-        qdr_address_t *addr = 0;
-
-        qd_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH);
-        qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr);
-        if (addr) {
-            //
-            // Forward the message.  We don't care what the fanout count is.
-            //
-            (void) qdr_forward_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess,
-                                          action->args.io.control);
-            addr->deliveries_from_container++;
-        } else
-            qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
+        qdr_in_process_send_to_CT(core,
+                                  qdr_field_iterator(action->args.io.address),
+                                  action->args.io.message,
+                                  action->args.io.exclude_inprocess,
+                                  action->args.io.control);
     }
 
-    qdr_field_free(addr_field);
-    qd_message_free(msg);
+    qdr_field_free(action->args.io.address);
+    qd_message_free(action->args.io.message);
+}
+
+
+/**
+ * forward an in-process message based on the destination address
+ */
+void qdr_in_process_send_to_CT(qdr_core_t *core, qd_iterator_t *address, qd_message_t *msg, bool exclude_inprocess, bool control)
+{
+    qdr_address_t *addr = 0;
+
+    qd_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
+    qd_hash_retrieve(core->addr_hash, address, (void**) &addr);
+    if (addr) {
+        //
+        // Forward the message.  We don't care what the fanout count is.
+        //
+        (void) qdr_forward_message_CT(core, addr, msg, 0, exclude_inprocess, control);
+        addr->deliveries_from_container++;
+    } else
+        qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address");
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 51dc438..45206e8 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -123,6 +123,7 @@ foreach(py_test_module
     ${CONSOLE_TEST}
     system_tests_priority
     system_tests_core_client
+    system_tests_address_lookup
     )
 
   add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module})

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index d268671..d4cd362 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -436,11 +436,12 @@ class Qdrouterd(Process):
         except:
             return False
 
-    def wait_address(self, address, subscribers=0, remotes=0, **retry_kwargs):
+    def wait_address(self, address, subscribers=0, remotes=0, containers=0, **retry_kwargs):
         """
         Wait for an address to be visible on the router.
         @keyword subscribers: Wait till subscriberCount >= subscribers
         @keyword remotes: Wait till remoteCount >= remotes
+        @keyword containers: Wait till containerCount >= remotes
         @param retry_kwargs: keyword args for L{retry}
         """
         def check():
@@ -449,11 +450,13 @@ class Qdrouterd(Process):
             # endswith check is because of M0/L/R prefixes
             addrs = self.management.query(
                 type='org.apache.qpid.dispatch.router.address',
-                attribute_names=[u'name', u'subscriberCount', u'remoteCount']).get_entities()
+                attribute_names=[u'name', u'subscriberCount', u'remoteCount', u'containerCount']).get_entities()
 
             addrs = [a for a in addrs if a['name'].endswith(address)]
 
-            return addrs and addrs[0]['subscriberCount'] >= subscribers and addrs[0]['remoteCount'] >= remotes
+            return (addrs and addrs[0]['subscriberCount'] >= subscribers
+                    and addrs[0]['remoteCount'] >= remotes
+                    and addrs[0]['containerCount'] >= containers)
         assert retry(check, **retry_kwargs)
 
     def get_host(self, protocol_family):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d201deac/tests/system_tests_address_lookup.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_address_lookup.py b/tests/system_tests_address_lookup.py
new file mode 100644
index 0000000..3538f93
--- /dev/null
+++ b/tests/system_tests_address_lookup.py
@@ -0,0 +1,287 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest2 as unittest
+import time
+
+from system_test import TestCase, Qdrouterd, TIMEOUT
+from system_test import AsyncTestReceiver
+from test_broker import FakeBroker
+
+from proton import Disposition
+from proton import Message
+from proton.utils import BlockingConnection
+from proton.utils import SyncRequestResponse
+from proton.utils import SendException
+from proton.utils import LinkDetached
+
+
+class LinkRouteLookupTest(TestCase):
+    """
+    Tests link route address lookup
+    """
+    # hardcoded values from the router's C code
+    QD_TERMINUS_ADDRESS_LOOKUP = '_$qd.addr_lookup'
+    PROTOCOL_VERSION = 1
+    OPCODE_LINK_ROUTE_LOOKUP = 1
+    QCM_ADDR_LOOKUP_OK = 0
+    QCM_ADDR_LOOKUP_NOT_FOUND = 3
+
+    def _check_response(self, message):
+        self.assertTrue(isinstance(message.properties, dict))
+        self.assertEqual(self.PROTOCOL_VERSION, message.properties.get('version'))
+        self.assertTrue(message.properties.get('opcode') is not None)
+        self.assertTrue(isinstance(message.body, list))
+        self.assertEqual(2, len(message.body))
+        return (message.properties.get('status'),
+                message.body[0],  # is link_route?
+                message.body[1])  # has destinations?
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(LinkRouteLookupTest, cls).setUpClass()
+
+        def router(name, mode, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'})
+            ]
+
+            if extra:
+                config.extend(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        broker_port_A     = cls.tester.get_port()
+        broker_port_B     = cls.tester.get_port()
+
+        router('INT.A', 'interior',
+               [
+                   ('listener', {'role': 'edge',
+                                 'port': cls.tester.get_port()}),
+                   ('listener', {'role': 'inter-router',
+                                 'port': inter_router_port}),
+                   ('connector', {'role': 'route-container',
+                                  'port': cls.tester.get_port()}),
+
+                   ('linkRoute', {'pattern': 'org.apache.A.#',
+                                  'containerId': 'FakeBrokerA',
+                                  'direction': 'in'}),
+                   ('linkRoute', {'pattern': 'org.apache.A.#',
+                                  'containerId': 'FakeBrokerA',
+                                  'direction': 'out'})
+               ])
+        cls.INT_A = cls.routers[-1]
+        cls.INT_A.listener = cls.INT_A.addresses[0]
+        cls.INT_A.edge_listener = cls.INT_A.addresses[1]
+        cls.INT_A.inter_router_listener = cls.INT_A.addresses[2]
+        cls.INT_A.broker_connector = cls.INT_A.connector_addresses[0]
+
+        router('INT.B', 'interior',
+               [
+                   ('listener', {'role': 'edge',
+                                 'port': cls.tester.get_port()}),
+                   ('connector', {'role': 'inter-router',
+                                  'name': 'connectorToA',
+                                  'port': inter_router_port}),
+                   ('connector', {'role': 'route-container',
+                                  'port': cls.tester.get_port()}),
+
+                   ('linkRoute', {'pattern': 'org.apache.B.#',
+                                  'containerId': 'FakeBrokerB',
+                                  'direction': 'in'}),
+                   ('linkRoute', {'pattern': 'org.apache.B.#',
+                                  'containerId': 'FakeBrokerB',
+                                  'direction': 'out'})
+                ])
+        cls.INT_B = cls.routers[-1]
+        cls.INT_B.edge_listener = cls.INT_B.addresses[1]
+        cls.INT_B.broker_connector = cls.INT_B.connector_addresses[1]
+
+        cls.INT_A.wait_router_connected('INT.B')
+        cls.INT_B.wait_router_connected('INT.A')
+
+    def _lookup_request(self, lr_address, direction):
+        """
+        Construct a link route lookup request message
+        """
+        return Message(body=[lr_address,
+                             direction],
+                       properties={"version": self.PROTOCOL_VERSION,
+                                   "opcode": self.OPCODE_LINK_ROUTE_LOOKUP})
+
+    def test_link_route_lookup_ok(self):
+        """
+        verify a link route address can be looked up successfully for both
+        locally attached and remotely attached containers
+        """
+
+        # fire up a fake broker attached to the router local to the edge router
+        fb = FakeBroker(self.INT_A.broker_connector, container_id='FakeBrokerA')
+        self.INT_A.wait_address("org.apache.A", containers=1)
+
+        # create a fake edge and lookup the target address
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        for direction in [True, False]:
+            # True = direction inbound (receiver) False = direction outbound (sender)
+            rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.foo", direction)))
+            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+            self.assertTrue(rsp[1])
+            self.assertTrue(rsp[2])
+
+        # shutdown fake router
+        fb.join()
+
+        # now fire up a fake broker attached to the remote router
+        fb = FakeBroker(self.INT_B.broker_connector, container_id='FakeBrokerB')
+        self.INT_A.wait_address("org.apache.B", remotes=1)
+
+        for direction in [True, False]:
+            rsp = self._check_response(srr.call(self._lookup_request("org.apache.B.foo", direction)))
+            self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+            self.assertTrue(rsp[1])
+            self.assertTrue(rsp[2])
+
+        fb.join()
+        bc.close()
+
+    def test_link_route_lookup_not_found(self):
+        """
+        verify correct handling of lookup misses
+        """
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        rsp = self._check_response(srr.call(self._lookup_request("not.found.address", True)))
+        self.assertEqual(self.QCM_ADDR_LOOKUP_NOT_FOUND, rsp[0])
+
+    def test_link_route_lookup_not_link_route(self):
+        """
+        verify correct handling of matches to mobile addresses
+        """
+        addr = "not.a.linkroute"
+        client = AsyncTestReceiver(self.INT_A.listener, addr)
+        self.INT_A.wait_address(addr)
+
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+
+        rsp = self._check_response(srr.call(self._lookup_request(addr, True)))
+        # self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+        self.assertEqual(False, rsp[1])
+        bc.close()
+        client.stop()
+
+    def test_link_route_lookup_no_dest(self):
+        """
+        verify correct handling of matches to mobile addresses
+        """
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        rsp = self._check_response(srr.call(self._lookup_request("org.apache.A.nope", True)))
+        self.assertEqual(self.QCM_ADDR_LOOKUP_OK, rsp[0])
+        self.assertEqual(True, rsp[1])
+        self.assertEqual(False, rsp[2])
+        bc.close()
+
+    def _invalid_request_test(self, msg, disposition=Disposition.REJECTED):
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        srr = SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        # @TODO(kgiusti) - self.assertRaises does not work here:
+        try:
+            srr.call(msg)
+            self.assertTrue(False)
+        except SendException as exc:
+            self.assertEqual(disposition, exc.state)
+        bc.close()
+
+    def test_link_route_invalid_request(self):
+        """
+        Test various invalid message content
+        """
+
+        # empty message
+        self._invalid_request_test(Message())
+
+        # missing properties
+        msg = self._lookup_request("ignore", False)
+        msg.properties = None
+        self._invalid_request_test(msg)
+
+        # missing version
+        msg = self._lookup_request("ignore", False)
+        del msg.properties['version']
+        self._invalid_request_test(msg)
+
+        # invalid version
+        msg = self._lookup_request("ignore", False)
+        msg.properties['version'] = "blech"
+        self._invalid_request_test(msg)
+
+        # unsupported version
+        msg = self._lookup_request("ignore", False)
+        msg.properties['version'] = 97387187
+        self._invalid_request_test(msg)
+
+        # missing opcode
+        msg = self._lookup_request("ignore", False)
+        del msg.properties['opcode']
+        self._invalid_request_test(msg)
+
+        # bad opcode
+        msg = self._lookup_request("ignore", False)
+        msg.properties['opcode'] = "snarf"
+        self._invalid_request_test(msg)
+
+        # bad body
+        msg = self._lookup_request("ignore", False)
+        msg.body = [71]
+        self._invalid_request_test(msg)
+
+    def test_lookup_bad_connection(self):
+        """
+        Verify that clients connected via non-edge connections fail
+        """
+        bc = BlockingConnection(self.INT_A.listener, timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()
+
+        bc = BlockingConnection(self.INT_A.inter_router_listener, timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            SyncRequestResponse(bc, self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()
+
+        # consuming from the lookup address is forbidden:
+        bc = BlockingConnection(self.INT_A.edge_listener, timeout=TIMEOUT)
+        with self.assertRaises(LinkDetached):
+            rcv = bc.create_receiver(self.QD_TERMINUS_ADDRESS_LOOKUP)
+        bc.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-1194 - Refactored the address lookup for attach into a core module with asynchronous capability.

Posted by tr...@apache.org.
DISPATCH-1194 - Refactored the address lookup for attach into a core module with asynchronous capability.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/564c5907
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/564c5907
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/564c5907

Branch: refs/heads/master
Commit: 564c5907cebb12653181f2e266dce8b581dc7c32
Parents: d201dea
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 4 09:38:09 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 4 09:38:09 2018 -0500

----------------------------------------------------------------------
 src/CMakeLists.txt                              |   1 +
 src/router_core/connections.c                   | 345 +----------------
 src/router_core/core_attach_address_lookup.h    |  54 +++
 .../address_lookup_client/lookup_client.c       | 367 +++++++++++++++++++
 src/router_core/router_core_private.h           |   3 +
 5 files changed, 433 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 352aec7..8fc6485 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -110,6 +110,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/test_hooks/core_test_hooks.c
   router_core/modules/edge_addr_tracking/edge_addr_tracking.c
   router_core/modules/address_lookup/address_lookup.c
+  router_core/modules/address_lookup_client/lookup_client.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index db271f4..d06bec9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -607,33 +607,6 @@ void qdr_link_enqueue_work_CT(qdr_core_t      *core,
 
 
 /**
- * Generate a temporary routable address for a destination connected to this
- * router node.
- */
-static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
-{
-    char discriminator[QD_DISCRIMINATOR_SIZE];
-    qd_generate_discriminator(discriminator);
-    if (core->router_mode == QD_ROUTER_MODE_EDGE)
-        snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
-    else
-        snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
-}
-
-
-/**
- * Generate a temporary mobile address for a producer connected to this
- * router node.
- */
-static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
-{
-    char discriminator[QD_DISCRIMINATOR_SIZE];
-    qd_generate_discriminator(discriminator);
-    snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
-}
-
-
-/**
  * Generate a link name
  */
 static void qdr_generate_link_name(const char *label, char *buffer, size_t length)
@@ -1087,173 +1060,6 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr)
 }
 
 
-/**
- * qdr_lookup_terminus_address_CT
- *
- * Lookup a terminus address in the route table and possibly create a new address
- * if no match is found.
- *
- * @param core Pointer to the core object
- * @param dir Direction of the link for the terminus
- * @param conn The connection over which the terminus was attached
- * @param terminus The terminus containing the addressing information to be looked up
- * @param create_if_not_found Iff true, return a pointer to a newly created address record
- * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
- * @param [out] link_route True iff the lookup indicates that an attach should be routed
- * @param [out] unavailable True iff this address is blocked as unavailable
- * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
- * @return Pointer to an address record or 0 if none is found
- */
-static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
-                                                     qd_direction_t    dir,
-                                                     qdr_connection_t *conn,
-                                                     qdr_terminus_t   *terminus,
-                                                     bool              create_if_not_found,
-                                                     bool              accept_dynamic,
-                                                     bool             *link_route,
-                                                     bool             *unavailable,
-                                                     bool             *core_endpoint)
-{
-    qdr_address_t *addr = 0;
-
-    //
-    // Unless expressly stated, link routing is not indicated for this terminus.
-    //
-    *link_route    = false;
-    *unavailable   = false;
-    *core_endpoint = false;
-
-    if (qdr_terminus_is_dynamic(terminus)) {
-        //
-        // The terminus is dynamic.  Check to see if there is an address provided
-        // in the dynamic node properties.  If so, look that address up as a link-routed
-        // destination.
-        //
-        qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
-        if (dnp_address) {
-            qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
-            if (conn->tenant_space)
-                qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
-            qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
-
-            if (addr && conn->tenant_space) {
-                //
-                // If this link is in a tenant space, translate the dnp address to
-                // the fully-qualified view
-                //
-                qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
-            }
-
-            qd_iterator_free(dnp_address);
-            *link_route = true;
-            return addr;
-        }
-
-        //
-        // The dynamic terminus has no address in the dynamic-node-propteries.  If we are
-        // permitted to generate dynamic addresses, create a new address that is local to
-        // this router and insert it into the address table with a hash index.
-        //
-        if (!accept_dynamic)
-            return 0;
-
-        char temp_addr[200];
-        bool generating = true;
-        while (generating) {
-            //
-            // The address-generation process is performed in a loop in case the generated
-            // address collides with a previously generated address (this should be _highly_
-            // unlikely).
-            //
-            if (dir == QD_OUTGOING)
-                qdr_generate_temp_addr(core, temp_addr, 200);
-            else
-                qdr_generate_mobile_addr(core, temp_addr, 200);
-
-            qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
-            qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
-            if (!addr) {
-                addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
-                qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
-                DEQ_INSERT_TAIL(core->addrs, addr);
-                qdr_terminus_set_address(terminus, temp_addr);
-                generating = false;
-            }
-            qd_iterator_free(temp_iter);
-        }
-        return addr;
-    }
-
-    //
-    // If the terminus is anonymous, there is no address to look up.
-    //
-    if (qdr_terminus_is_anonymous(terminus))
-        return 0;
-
-    //
-    // The terminus has a non-dynamic address that we need to look up.  First, look for
-    // a link-route destination for the address.
-    //
-    qd_iterator_t *iter = qdr_terminus_get_address(terminus);
-    qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
-    if (conn->tenant_space)
-        qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
-    qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
-    if (addr) {
-        *link_route = true;
-
-        //
-        // If this link is in a tenant space, translate the terminus address to
-        // the fully-qualified view
-        //
-        if (conn->tenant_space) {
-            qdr_terminus_set_address_iterator(terminus, iter);
-        }
-        return addr;
-    }
-
-    //
-    // There was no match for a link-route destination, look for a message-route address.
-    //
-    int in_phase;
-    int out_phase;
-    int addr_phase;
-    int priority;
-    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
-
-    qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
-    addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
-    qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
-
-    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
-
-    if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
-        *unavailable = true;
-
-    if (!addr && create_if_not_found) {
-        addr = qdr_address_CT(core, treat);
-        if (addr) {
-            qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
-            DEQ_INSERT_TAIL(core->addrs, addr);
-        }
-
-        if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
-            *unavailable = true;
-    }
-
-    if (qdr_terminus_is_coordinator(terminus))
-        *unavailable = false;
-
-    if (!!addr && addr->core_endpoint != 0)
-        *core_endpoint = true;
-
-    if (addr)
-        addr->priority = priority;
-    return addr;
-}
-
-
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
 
@@ -1412,19 +1218,6 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
 }
 
 
-static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
-{
-    size_t olen = strlen(original);
-    size_t clen = strlen(conn->container);
-    char *name = (char*) malloc(olen + clen + 2);
-    memset(name, 0, olen + clen + 2);
-    strcat(name, original);
-    name[olen] = '@';
-    strcat(name + olen + 1, conn->container);
-    return name;
-}
-
-
 //
 // Handle the attachment and detachment of an inter-router control link
 //
@@ -1508,7 +1301,6 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
     qd_direction_t     dir    = action->args.connection.dir;
     qdr_terminus_t    *source = action->args.connection.source;
     qdr_terminus_t    *target = action->args.connection.target;
-    bool               success;
 
     //
     // Put the link into the proper lists for tracking.
@@ -1561,93 +1353,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 //
                 // This link has a target address
                 //
-                bool  link_route;
-                bool  unavailable;
-                bool  core_endpoint;
-                qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, target, true, true, &link_route, &unavailable, &core_endpoint);
-
-                if (core_endpoint) {
-                    qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
-                }
-
-                else if (unavailable) {
-                    qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
-                    qdr_terminus_free(source);
-                    qdr_terminus_free(target);
-                }
-
-                else if (!addr) {
-                    //
-                    // No route to this destination, reject the link
-                    //
+                if (core->addr_lookup_handler)
+                    core->addr_lookup_handler(core, conn, link, dir, source, target);
+                else {
                     qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                     qdr_terminus_free(source);
                     qdr_terminus_free(target);
-                }
-
-                else if (link_route) {
-                    //
-                    // This is a link-routed destination, forward the attach to the next hop
-                    //
-                    if (qdr_terminus_survives_disconnect(target) && !core->qd->allow_resumable_link_route) {
-                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
-                        qdr_terminus_free(source);
-                        qdr_terminus_free(target);
-                    } else {
-                        if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
-                            link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
-                        }
-                        success = qdr_forward_attach_CT(core, addr, link, source, target);
-
-                        if (!success) {
-                            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
-                            qdr_terminus_free(source);
-                            qdr_terminus_free(target);
-                        }
-                    }
-
-                }
-
-                else if (qdr_terminus_is_coordinator(target)) {
-                    //
-                    // This target terminus is a coordinator.
-                    // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
-                    // The router should reject this link because the router cannot coordinate transactions itself.
-                    //
-                    // The attach response should have a null target to indicate refusal and the immediately coming detach.
-                    qdr_link_outbound_second_attach_CT(core, link, source, 0);
-                    // Now, send back a detach with the error amqp:precondition-failed
-                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
-                }
-                else {
-                    //
-                    // Associate the link with the address.  With this association, it will be unnecessary
-                    // to do an address lookup for deliveries that arrive on this link.
-                    //
-                    qdr_core_bind_address_link_CT(core, addr, link);
-                    qdr_link_outbound_second_attach_CT(core, link, source, target);
-
-                    //
-                    // Issue the initial credit only if one of the following
-                    // holds:
-                    // - there are destinations for the address
-                    // - if the address treatment is multicast
-                    // - the address is that of an exchange (no subscribers allowed)
-                    //
-                    if (DEQ_SIZE(addr->subscriptions)
-                            || DEQ_SIZE(addr->rlinks)
-                            || qd_bitmask_cardinality(addr->rnodes)
-                            || qdr_is_addr_treatment_multicast(addr)
-                            || !!addr->exchange) {
-                        qdr_link_issue_credit_CT(core, link, link->capacity, false);
-                    }
-
-                    //
-                    // If this link came through an edge connection, raise a link event to
-                    // herald that fact.
-                    //
-                    if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
-                        qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+                    return;
                 }
             }
             break;
@@ -1668,54 +1380,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
         //
         switch (link->link_type) {
         case QD_LINK_ENDPOINT: {
-            bool  link_route;
-            bool  unavailable;
-            bool  core_endpoint;
-            qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, source, true, true, &link_route, &unavailable, &core_endpoint);
-
-            if (core_endpoint) {
-                qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
-            }
-
-            else if (unavailable) {
-                qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
-                qdr_terminus_free(source);
-                qdr_terminus_free(target);
-            }
-
-            else if (!addr) {
-                //
-                // No route to this destination, reject the link
-                //
+            if (core->addr_lookup_handler)
+                core->addr_lookup_handler(core, conn, link, dir, source, target);
+            else {
                 qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                 qdr_terminus_free(source);
                 qdr_terminus_free(target);
-            }
-
-            else if (link_route) {
-                //
-                // This is a link-routed destination, forward the attach to the next hop
-                //
-                if (qdr_terminus_survives_disconnect(source) && !core->qd->allow_resumable_link_route) {
-                    qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
-                    qdr_terminus_free(source);
-                    qdr_terminus_free(target);
-                } else {
-                    if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
-                        link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
-                    }
-                    bool success = qdr_forward_attach_CT(core, addr, link, source, target);
-                    if (!success) {
-                        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
-                        qdr_terminus_free(source);
-                        qdr_terminus_free(target);
-                    }
-                }
-            }
-
-            else {
-                qdr_core_bind_address_link_CT(core, addr, link);
-                qdr_link_outbound_second_attach_CT(core, link, source, target);
+                return;
             }
             break;
         }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/core_attach_address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h
new file mode 100644
index 0000000..d05b9cb
--- /dev/null
+++ b/src/router_core/core_attach_address_lookup.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_attach_addr_lookup_types
+#define qd_router_core_attach_addr_lookup_types 1
+
+#include "router_core_private.h"
+#endif
+
+#ifndef qd_router_core_attach_addr_lookup
+#define qd_router_core_attach_addr_lookup 1
+
+
+/**
+ * Handler - Look up the address on a received first-attach
+ *
+ * This handler is invoked upon receipt of a first-attach on a normal endpoint link.
+ * The appropriate address, from source or target will be resolved to and address for
+ * message or link routing.  This operation may be synchronoue (completed before it
+ * returns) or asynchronous (completed later).
+ *
+ * @param core Pointer to the core state.
+ * @param conn Pointer to the connection over which the attach arrived.
+ * @param link Pointer to the attaching link.
+ * @param dir The direction of message flow for the link.
+ * @param source The source terminus for the attach.
+ * @param target The target terminus for the attach.
+ */
+typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t       *core,
+                                           qdr_connection_t *conn,
+                                           qdr_link_t       *link,
+                                           qd_direction_t    dir,
+                                           qdr_terminus_t   *source,
+                                           qdr_terminus_t   *target);
+
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
new file mode 100644
index 0000000..924565e
--- /dev/null
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "module.h"
+#include "core_attach_address_lookup.h"
+#include "router_core_private.h"
+#include <qpid/dispatch/discriminator.h>
+#include <stdio.h>
+
+static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
+{
+    size_t olen = strlen(original);
+    size_t clen = strlen(conn->container);
+    char *name = (char*) malloc(olen + clen + 2);
+    memset(name, 0, olen + clen + 2);
+    strcat(name, original);
+    name[olen] = '@';
+    strcat(name + olen + 1, conn->container);
+    return name;
+}
+
+
+/**
+ * Generate a temporary routable address for a destination connected to this
+ * router node.
+ */
+static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+    char discriminator[QD_DISCRIMINATOR_SIZE];
+    qd_generate_discriminator(discriminator);
+    if (core->router_mode == QD_ROUTER_MODE_EDGE)
+        snprintf(buffer, length, "amqp:/_edge/%s/temp.%s", core->router_id, discriminator);
+    else
+        snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator);
+}
+
+
+/**
+ * Generate a temporary mobile address for a producer connected to this
+ * router node.
+ */
+static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t length)
+{
+    char discriminator[QD_DISCRIMINATOR_SIZE];
+    qd_generate_discriminator(discriminator);
+    snprintf(buffer, length, "amqp:/_$temp.%s", discriminator);
+}
+
+
+/**
+ * qdr_lookup_terminus_address_CT
+ *
+ * Lookup a terminus address in the route table and possibly create a new address
+ * if no match is found.
+ *
+ * @param core Pointer to the core object
+ * @param dir Direction of the link for the terminus
+ * @param conn The connection over which the terminus was attached
+ * @param terminus The terminus containing the addressing information to be looked up
+ * @param create_if_not_found Iff true, return a pointer to a newly created address record
+ * @param accept_dynamic Iff true, honor the dynamic flag by creating a dynamic address
+ * @param [out] link_route True iff the lookup indicates that an attach should be routed
+ * @param [out] unavailable True iff this address is blocked as unavailable
+ * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint
+ * @return Pointer to an address record or 0 if none is found
+ */
+static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
+                                                     qd_direction_t    dir,
+                                                     qdr_connection_t *conn,
+                                                     qdr_terminus_t   *terminus,
+                                                     bool              create_if_not_found,
+                                                     bool              accept_dynamic,
+                                                     bool             *link_route,
+                                                     bool             *unavailable,
+                                                     bool             *core_endpoint)
+{
+    qdr_address_t *addr = 0;
+
+    //
+    // Unless expressly stated, link routing is not indicated for this terminus.
+    //
+    *link_route    = false;
+    *unavailable   = false;
+    *core_endpoint = false;
+
+    if (qdr_terminus_is_dynamic(terminus)) {
+        //
+        // The terminus is dynamic.  Check to see if there is an address provided
+        // in the dynamic node properties.  If so, look that address up as a link-routed
+        // destination.
+        //
+        qd_iterator_t *dnp_address = qdr_terminus_dnp_address(terminus);
+        if (dnp_address) {
+            qd_iterator_reset_view(dnp_address, ITER_VIEW_ADDRESS_WITH_SPACE);
+            if (conn->tenant_space)
+                qd_iterator_annotate_space(dnp_address, conn->tenant_space, conn->tenant_space_len);
+            qd_parse_tree_retrieve_match(core->link_route_tree[dir], dnp_address, (void**) &addr);
+
+            if (addr && conn->tenant_space) {
+                //
+                // If this link is in a tenant space, translate the dnp address to
+                // the fully-qualified view
+                //
+                qdr_terminus_set_dnp_address_iterator(terminus, dnp_address);
+            }
+
+            qd_iterator_free(dnp_address);
+            *link_route = true;
+            return addr;
+        }
+
+        //
+        // The dynamic terminus has no address in the dynamic-node-propteries.  If we are
+        // permitted to generate dynamic addresses, create a new address that is local to
+        // this router and insert it into the address table with a hash index.
+        //
+        if (!accept_dynamic)
+            return 0;
+
+        char temp_addr[200];
+        bool generating = true;
+        while (generating) {
+            //
+            // The address-generation process is performed in a loop in case the generated
+            // address collides with a previously generated address (this should be _highly_
+            // unlikely).
+            //
+            if (dir == QD_OUTGOING)
+                qdr_generate_temp_addr(core, temp_addr, 200);
+            else
+                qdr_generate_mobile_addr(core, temp_addr, 200);
+
+            qd_iterator_t *temp_iter = qd_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+            qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
+            if (!addr) {
+                addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED);
+                qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle);
+                DEQ_INSERT_TAIL(core->addrs, addr);
+                qdr_terminus_set_address(terminus, temp_addr);
+                generating = false;
+            }
+            qd_iterator_free(temp_iter);
+        }
+        return addr;
+    }
+
+    //
+    // If the terminus is anonymous, there is no address to look up.
+    //
+    if (qdr_terminus_is_anonymous(terminus))
+        return 0;
+
+    //
+    // The terminus has a non-dynamic address that we need to look up.  First, look for
+    // a link-route destination for the address.
+    //
+    qd_iterator_t *iter = qdr_terminus_get_address(terminus);
+    qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_WITH_SPACE);
+    if (conn->tenant_space)
+        qd_iterator_annotate_space(iter, conn->tenant_space, conn->tenant_space_len);
+    qd_parse_tree_retrieve_match(core->link_route_tree[dir], iter, (void**) &addr);
+    if (addr) {
+        *link_route = true;
+
+        //
+        // If this link is in a tenant space, translate the terminus address to
+        // the fully-qualified view
+        //
+        if (conn->tenant_space) {
+            qdr_terminus_set_address_iterator(terminus, iter);
+        }
+        return addr;
+    }
+
+    //
+    // There was no match for a link-route destination, look for a message-route address.
+    //
+    int in_phase;
+    int out_phase;
+    int addr_phase;
+    int priority;
+    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority);
+
+    qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+    qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
+    addr_phase = dir == QD_INCOMING ? in_phase : out_phase;
+    qd_iterator_annotate_phase(iter, (char) addr_phase + '0');
+
+    qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
+
+    if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE)
+        *unavailable = true;
+
+    if (!addr && create_if_not_found) {
+        addr = qdr_address_CT(core, treat);
+        if (addr) {
+            qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
+            DEQ_INSERT_TAIL(core->addrs, addr);
+        }
+
+        if (!addr && treat == QD_TREATMENT_UNAVAILABLE)
+            *unavailable = true;
+    }
+
+    if (qdr_terminus_is_coordinator(terminus))
+        *unavailable = false;
+
+    if (!!addr && addr->core_endpoint != 0)
+        *core_endpoint = true;
+
+    if (addr)
+        addr->priority = priority;
+    return addr;
+}
+
+
+static void qdr_link_react_to_first_attach_CT(qdr_core_t       *core,
+                                              qdr_connection_t *conn,
+                                              qdr_address_t    *addr,
+                                              qdr_link_t       *link,
+                                              qd_direction_t    dir,
+                                              qdr_terminus_t   *source,
+                                              qdr_terminus_t   *target,
+                                              bool              link_route,
+                                              bool              unavailable,
+                                              bool              core_endpoint)
+{
+    if (core_endpoint) {
+        qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
+    }
+
+    else if (unavailable) {
+        qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
+        qdr_terminus_free(source);
+        qdr_terminus_free(target);
+    }
+
+    else if (!addr) {
+        //
+        // No route to this destination, reject the link
+        //
+        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+        qdr_terminus_free(source);
+        qdr_terminus_free(target);
+    }
+
+    else if (link_route) {
+        //
+        // This is a link-routed destination, forward the attach to the next hop
+        //
+        qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+        if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) {
+            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
+            qdr_terminus_free(source);
+            qdr_terminus_free(target);
+        } else {
+            if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
+                link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
+            }
+            bool success = qdr_forward_attach_CT(core, addr, link, source, target);
+            if (!success) {
+                qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
+                qdr_terminus_free(source);
+                qdr_terminus_free(target);
+            }
+        }
+    }
+
+    else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) {
+        //
+        // This target terminus is a coordinator.
+        // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
+        // The router should reject this link because the router cannot coordinate transactions itself.
+        //
+        // The attach response should have a null target to indicate refusal and the immediately coming detach.
+        //
+        qdr_link_outbound_second_attach_CT(core, link, source, 0);
+
+        //
+        // Now, send back a detach with the error amqp:precondition-failed
+        //
+        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
+    } else {
+        //
+        // Associate the link with the address.  With this association, it will be unnecessary
+        // to do an address lookup for deliveries that arrive on this link.
+        //
+        qdr_core_bind_address_link_CT(core, addr, link);
+        qdr_link_outbound_second_attach_CT(core, link, source, target);
+
+        //
+        // Issue the initial credit only if one of the following
+        // holds:
+        // - there are destinations for the address
+        // - if the address treatment is multicast
+        // - the address is that of an exchange (no subscribers allowed)
+        //
+        if (dir == QD_INCOMING
+            && (DEQ_SIZE(addr->subscriptions)
+                || DEQ_SIZE(addr->rlinks)
+                || qd_bitmask_cardinality(addr->rnodes)
+                || qdr_is_addr_treatment_multicast(addr)
+                || !!addr->exchange)) {
+            qdr_link_issue_credit_CT(core, link, link->capacity, false);
+        }
+
+        //
+        // If this link came through an edge connection, raise a link event to
+        // herald that fact.
+        //
+        if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
+            qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
+    }
+}
+
+
+static void qcm_addr_lookup_CT(qdr_core_t       *core,
+                               qdr_connection_t *conn,
+                               qdr_link_t       *link,
+                               qd_direction_t    dir,
+                               qdr_terminus_t   *source,
+                               qdr_terminus_t   *target)
+
+{
+    bool link_route;
+    bool unavailable;
+    bool core_endpoint;
+    qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+
+    qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
+    qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+}
+
+
+static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context)
+{
+    assert(core->addr_lookup_handler == 0);
+
+    core->addr_lookup_handler = qcm_addr_lookup_CT;
+    *module_context           = core;
+}
+
+
+static void qcm_addr_lookup_client_final_CT(void *module_context)
+{
+    qdr_core_t *core = (qdr_core_t*) module_context;
+    core->addr_lookup_handler = 0;
+}
+
+
+QDR_CORE_MODULE_DECLARE("address_lookup_client", qcm_addr_lookup_client_init_CT, qcm_addr_lookup_client_final_CT)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/564c5907/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0b70c97..bb4af57 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -43,6 +43,7 @@ typedef struct qdr_edge_t            qdr_edge_t;
 
 #include "core_link_endpoint.h"
 #include "core_events.h"
+#include "core_attach_address_lookup.h"
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -727,6 +728,8 @@ struct qdr_core_t {
     qdr_connection_list_t connections_to_activate;
     qdr_link_list_t       open_links;
 
+    qdrc_attach_addr_lookup_t  addr_lookup_handler;
+
     //
     // Agent section
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org