You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/04 16:57:16 UTC
qpid-dispatch git commit: DISPATCH-1194: re-arrange the address
lookup server source code
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 564c5907c -> d965d9b1d
DISPATCH-1194: re-arrange the address lookup server source code
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d965d9b1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d965d9b1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d965d9b1
Branch: refs/heads/master
Commit: d965d9b1df8c83ba8f59512886b50b203666e33f
Parents: 564c590
Author: Kenneth Giusti <kg...@apache.org>
Authored: Tue Dec 4 11:56:13 2018 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Tue Dec 4 11:56:13 2018 -0500
----------------------------------------------------------------------
include/qpid/dispatch/address_lookup_server.h | 83 ++++
src/CMakeLists.txt | 3 +-
src/address_lookup_utils.c | 110 +++++
.../modules/address_lookup/address_lookup.c | 454 -------------------
.../modules/address_lookup/address_lookup.h | 75 ---
.../address_lookup_server.c | 360 +++++++++++++++
6 files changed, 555 insertions(+), 530 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/include/qpid/dispatch/address_lookup_server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/address_lookup_server.h b/include/qpid/dispatch/address_lookup_server.h
new file mode 100644
index 0000000..33a6782
--- /dev/null
+++ b/include/qpid/dispatch/address_lookup_server.h
@@ -0,0 +1,83 @@
+#ifndef router_core_address_lookup_server_h
+#define router_core_address_lookup__server_h 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/container.h>
+#include <qpid/dispatch/compose.h>
+//
+// API for building address lookup request messages. The message properties
+// and body fields are handled separately so they can be passed directly to the
+// core client API.
+//
+
+#define PROTOCOL_VERSION 1
+
+typedef enum {
+ // note: keep unit test in sync
+ OPCODE_INVALID,
+ OPCODE_LINK_ROUTE_LOOKUP,
+} address_lookup_opcode_t;
+
+typedef enum {
+ // note: keep unit test in sync
+ QCM_ADDR_LOOKUP_OK,
+ QCM_ADDR_LOOKUP_BAD_VERSION,
+ QCM_ADDR_LOOKUP_BAD_OPCODE,
+ QCM_ADDR_LOOKUP_NOT_FOUND,
+ QCM_ADDR_LOOKUP_INVALID_REQUEST,
+} qcm_address_lookup_status_t;
+
+
+/**
+ * Create the message properties and body for a link route address lookup. The
+ * returned properties and body can be passed directly to
+ * qdrc_client_request_CT().
+ *
+ * @param address - fully qualified link route address to look up.
+ * @param dir - QD_INCOMING or QD_OUTGOING
+ * @param properties - return value for message application properties section
+ * @param body - return value for message body
+ * @return zero on success
+ */
+int qcm_link_route_lookup_request(qd_iterator_t *address,
+ qd_direction_t dir,
+ qd_composed_field_t **properties,
+ qd_composed_field_t **body);
+
+
+/**
+ * Parse out the payload of the link route lookup reply message. The
+ * properties and body fields are provided by the on_reply_cb() callback passed
+ * to the qdrc_client_request_CT() call.
+ *
+ * @param properties - application properties as returned in the reply
+ * @param body - body from reply message
+ * @param is_link_route - set True if the address is a link route address that
+ * exists in the route tables of the queried router.
+ * @param has_destination - if is_link_route this indicates whether or not the
+ * queried router has active destinations for this link route.
+ * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+ qd_iterator_t *body,
+ bool *is_link_route,
+ bool *has_destinations);
+#endif // router_core_address_lookup_server_h
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8fc6485..1544db9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -78,6 +78,7 @@ set(qpid_dispatch_SOURCES
python_embedded.c
router_agent.c
router_config.c
+ address_lookup_utils.c
router_core/agent.c
router_core/agent_address.c
router_core/agent_config_address.c
@@ -109,7 +110,7 @@ set(qpid_dispatch_SOURCES
router_core/modules/edge_router/edge_mgmt.c
router_core/modules/test_hooks/core_test_hooks.c
router_core/modules/edge_addr_tracking/edge_addr_tracking.c
- router_core/modules/address_lookup/address_lookup.c
+ router_core/modules/address_lookup_server/address_lookup_server.c
router_core/modules/address_lookup_client/lookup_client.c
router_node.c
router_pynode.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/address_lookup_utils.c
----------------------------------------------------------------------
diff --git a/src/address_lookup_utils.c b/src/address_lookup_utils.c
new file mode 100644
index 0000000..4c92921
--- /dev/null
+++ b/src/address_lookup_utils.c
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// API for interacting with the core address lookup server
+//
+
+#include <qpid/dispatch/address_lookup_server.h>
+#include <qpid/dispatch/amqp.h>
+#include <qpid/dispatch/parse.h>
+
+/* create the message application properties and body for the link route lookup
+ * request message
+ */
+int qcm_link_route_lookup_msg(qd_iterator_t *address,
+ qd_direction_t dir,
+ qd_composed_field_t **properties,
+ qd_composed_field_t **body)
+{
+ *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
+ if (!*properties)
+ return -1;
+ qd_compose_start_map(*properties);
+ qd_compose_insert_string(*properties, "version");
+ qd_compose_insert_uint(*properties, PROTOCOL_VERSION);
+ qd_compose_insert_string(*properties, "opcode");
+ qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP);
+ qd_compose_end_map(*properties);
+
+ *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+ if (!*body) {
+ qd_compose_free(*properties);
+ *properties = 0;
+ return -1;
+ }
+ qd_compose_start_list(*body);
+ qd_compose_insert_string_iterator(*body, address);
+ qd_compose_insert_bool(*body, (dir == QD_INCOMING
+ ? QD_AMQP_LINK_ROLE_RECEIVER
+ : QD_AMQP_LINK_ROLE_SENDER));
+ qd_compose_end_list(*body);
+ return 0;
+}
+
+
+/* parse a reply to the link route lookup request
+ */
+qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
+ qd_iterator_t *body,
+ bool *is_link_route,
+ bool *has_destinations)
+{
+ qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK;
+ *is_link_route = false;
+ *has_destinations = false;
+
+ qd_parsed_field_t *props = qd_parse(properties);
+ if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props))
+ return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+
+ qd_parsed_field_t *bod = qd_parse(body);
+ if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) {
+ qd_parse_free(props);
+ return QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ }
+
+ qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status");
+ if (!tmp || !qd_parse_is_scalar(tmp)) {
+ rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ goto exit;
+ } else {
+ int32_t status = qd_parse_as_int(tmp);
+ if (status != QCM_ADDR_LOOKUP_OK) {
+ rc = (qcm_address_lookup_status_t) status;
+ goto exit;
+ }
+ }
+
+ // bod[0] == is_link_route (bool)
+ // bod[1] == has_destinations (bool)
+
+ if (qd_parse_sub_count(bod) < 2) {
+ rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
+ goto exit;
+ }
+
+ *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0));
+ *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1));
+
+exit:
+ qd_parse_free(props);
+ qd_parse_free(bod);
+ return rc;
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup/address_lookup.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.c b/src/router_core/modules/address_lookup/address_lookup.c
deleted file mode 100644
index 93297ac..0000000
--- a/src/router_core/modules/address_lookup/address_lookup.c
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "address_lookup.h"
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/amqp.h>
-#include "module.h"
-#include "core_link_endpoint.h"
-
-#include "inttypes.h"
-
-#define PROTOCOL_VERSION 1
-
-typedef enum {
- // note: keep unit test in sync
- OPCODE_INVALID,
- OPCODE_LINK_ROUTE_LOOKUP,
-} address_lookup_opcode_t;
-
-
-/* create the message application properties and body for the link route lookup
- * request message
- */
-int qcm_link_route_lookup_msg(qd_iterator_t *address,
- qd_direction_t dir,
- qd_composed_field_t **properties,
- qd_composed_field_t **body)
-{
- *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
- if (!*properties)
- return -1;
- qd_compose_start_map(*properties);
- qd_compose_insert_string(*properties, "version");
- qd_compose_insert_uint(*properties, PROTOCOL_VERSION);
- qd_compose_insert_string(*properties, "opcode");
- qd_compose_insert_uint(*properties, OPCODE_LINK_ROUTE_LOOKUP);
- qd_compose_end_map(*properties);
-
- *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
- if (!*body) {
- qd_compose_free(*properties);
- *properties = 0;
- return -1;
- }
- qd_compose_start_list(*body);
- qd_compose_insert_string_iterator(*body, address);
- qd_compose_insert_bool(*body, (dir == QD_INCOMING
- ? QD_AMQP_LINK_ROLE_RECEIVER
- : QD_AMQP_LINK_ROLE_SENDER));
- qd_compose_end_list(*body);
- return 0;
-}
-
-
-/* parse a reply to the link route lookup request
- */
-qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
- qd_iterator_t *body,
- bool *is_link_route,
- bool *has_destinations)
-{
- qcm_address_lookup_status_t rc = QCM_ADDR_LOOKUP_OK;
- *is_link_route = false;
- *has_destinations = false;
-
- qd_parsed_field_t *props = qd_parse(properties);
- if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props))
- return QCM_ADDR_LOOKUP_INVALID_REQUEST;
-
- qd_parsed_field_t *bod = qd_parse(body);
- if (!bod || !qd_parse_ok(bod) || !qd_parse_is_list(bod)) {
- qd_parse_free(props);
- return QCM_ADDR_LOOKUP_INVALID_REQUEST;
- }
-
- qd_parsed_field_t *tmp = qd_parse_value_by_key(props, "status");
- if (!tmp || !qd_parse_is_scalar(tmp)) {
- rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
- goto exit;
- } else {
- int32_t status = qd_parse_as_int(tmp);
- if (status != QCM_ADDR_LOOKUP_OK) {
- rc = (qcm_address_lookup_status_t) status;
- goto exit;
- }
- }
-
- // bod[0] == is_link_route (bool)
- // bod[1] == has_destinations (bool)
-
- if (qd_parse_sub_count(bod) < 2) {
- rc = QCM_ADDR_LOOKUP_INVALID_REQUEST;
- goto exit;
- }
-
- *is_link_route = qd_parse_as_bool(qd_parse_sub_value(bod, 0));
- *has_destinations = qd_parse_as_bool(qd_parse_sub_value(bod, 1));
-
-exit:
- qd_parse_free(props);
- qd_parse_free(bod);
- return rc;
-}
-
-
-typedef struct _endpoint_ref {
- DEQ_LINKS(struct _endpoint_ref);
- qdrc_endpoint_t *endpoint;
- const char *container_id;
-} _endpoint_ref_t;
-DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t);
-ALLOC_DEFINE(_endpoint_ref_t);
-
-
-static struct {
- qdr_core_t *core;
- _endpoint_ref_list_t endpoints;
-} _server_state;
-
-
-/* parse out the opcode from the request
- */
-static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties)
-{
- if (!properties)
- return OPCODE_INVALID;
- qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode");
- if (!oc)
- return OPCODE_INVALID;
- uint32_t opcode = qd_parse_as_uint(oc);
- if (!qd_parse_ok(oc))
- return OPCODE_INVALID;
- return (address_lookup_opcode_t)opcode;
-}
-
-
-/* send a reply to a lookup request
- */
-static uint64_t _send_reply(_endpoint_ref_t *epr,
- address_lookup_opcode_t opcode,
- qcm_address_lookup_status_t status,
- qd_iterator_t *correlation_id,
- qd_iterator_t *reply_to,
- qd_composed_field_t *body)
-{
- if (!correlation_id || !reply_to) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Link route address reply failed - invalid request message properties"
- " (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- return PN_REJECTED;
- }
-
- qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
- qd_compose_start_list(fld);
- qd_compose_insert_bool(fld, 0); // durable
- qd_compose_end_list(fld);
-
- fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
- qd_compose_start_list(fld);
- qd_compose_insert_null(fld); // message-id
- qd_compose_insert_null(fld); // user-id
- qd_compose_insert_typed_iterator(fld, reply_to); // to
- qd_compose_insert_null(fld); // subject
- qd_compose_insert_null(fld); // reply-to
- qd_compose_insert_typed_iterator(fld, correlation_id);
- qd_compose_end_list(fld);
-
- fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld);
- qd_compose_start_map(fld);
- qd_compose_insert_string(fld, "version");
- qd_compose_insert_uint(fld, PROTOCOL_VERSION);
- qd_compose_insert_string(fld, "opcode");
- qd_compose_insert_uint(fld, opcode);
- qd_compose_insert_string(fld, "status");
- qd_compose_insert_uint(fld, status);
- qd_compose_end_map(fld);
-
- qd_message_t *msg = qd_message();
-
- qd_message_compose_3(msg, fld, body);
- qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
- qd_message_free(msg);
- qd_compose_free(fld);
-
- return PN_ACCEPTED;
-}
-
-
-/* perform a link route lookup
- */
-static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr,
- qd_parsed_field_t *body,
- qd_iterator_t *reply_to,
- qd_iterator_t *cid)
-{
- if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Link route address lookup failed - invalid request body"
- " (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- return PN_REJECTED;
- }
-
- //
- // body[0] == fully qualified address (string)
- // body[1] == direction (bool, true == receiver)
- //
-
- qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0));
- qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1))
- ? QD_INCOMING : QD_OUTGOING);
-
- bool is_link_route = false;
- bool has_destinations = false;
- qdr_address_t *addr = 0;
- qd_iterator_reset_view(addr_i, ITER_VIEW_ALL);
- qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr);
- if (addr) {
- is_link_route = true;
- has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes));
- }
-
- // out_body[0] == is_link_route (bool)
- // out_body[1] == has_destinations (bool)
-
- qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
- qd_compose_start_list(out_body);
- qd_compose_insert_bool(out_body, is_link_route);
- qd_compose_insert_bool(out_body, has_destinations);
- qd_compose_end_list(out_body);
-
- uint64_t rc = _send_reply(epr,
- OPCODE_LINK_ROUTE_LOOKUP,
- addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND,
- cid,
- reply_to,
- out_body);
- qd_compose_free(out_body);
-
- if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
- char *as = (char *)qd_iterator_copy(addr_i);
- qd_log(_server_state.core->log, QD_LOG_TRACE,
- "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s"
- " (container=%s, endpoint=%p)",
- as,
- (addr) ? "" : "not ",
- is_link_route ? "yes" : "no",
- has_destinations ? "yes" : "no",
- epr->container_id,
- (void *)epr->endpoint);
- free(as);
- }
- return rc;
-}
-
-
-/* handle lookup request from client
- */
-void _on_transfer(void *link_context,
- qdr_delivery_t *delivery,
- qd_message_t *message)
-{
- if (!qd_message_receive_complete(message))
- return;
-
- _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
- qd_log(_server_state.core->log, QD_LOG_TRACE,
- "Address lookup request received (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
-
- uint64_t disposition = PN_ACCEPTED;
- qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
- qd_parsed_field_t *props = qd_parse(p_iter);
- if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Invalid address lookup request - no properties (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- disposition = PN_REJECTED;
- goto exit;
- }
-
- qd_parsed_field_t *v = qd_parse_value_by_key(props, "version");
- if (!v) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Invalid address lookup request - no version (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- disposition = PN_REJECTED;
- goto exit;
- }
-
- uint32_t version = qd_parse_as_uint(v);
- if (!qd_parse_ok(v)) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Invalid address lookup request - invalid version (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- disposition = PN_REJECTED;
- goto exit;
- }
-
- if (version != PROTOCOL_VERSION) {
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Invalid address lookup request - unknown version"
- " (container=%s, endpoint=%p, version=%"PRIu32")",
- epr->container_id, (void *)epr->endpoint, version);
- disposition = PN_REJECTED;
- goto exit;
- // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION
- }
-
- address_lookup_opcode_t opcode = _decode_opcode(props);
- switch (opcode) {
- case OPCODE_LINK_ROUTE_LOOKUP: {
- qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY);
- qd_parsed_field_t *body = qd_parse(b_iter);
- qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO);
- qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID);
- disposition = _do_link_route_lookup(epr, body, reply_to, cid);
- qd_iterator_free(cid);
- qd_iterator_free(reply_to);
- qd_parse_free(body);
- qd_iterator_free(b_iter);
- break;
- }
- case OPCODE_INVALID:
- default:
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Invalid address lookup request - invalid opcode"
- " (container=%s, endpoint=%p, opcode=%d)",
- epr->container_id, (void *)epr->endpoint, opcode);
- disposition = PN_REJECTED;
- }
-
-exit:
- qd_parse_free(props);
- qd_iterator_free(p_iter);
- qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition);
- qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
- return;
-}
-
-
-/* handle incoming attach to address lookup service
- */
-static void _on_first_attach(void *bind_context,
- qdrc_endpoint_t *endpoint,
- void **link_context,
- qdr_terminus_t *remote_source,
- qdr_terminus_t *remote_target)
-{
- //
- // Only accept incoming links initiated by the edge router. Detach all
- // other links
- //
- qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint);
- if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING ||
- conn->role != QDR_ROLE_EDGE_CONNECTION) {
- *link_context = 0;
- qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0);
- qd_log(_server_state.core->log, QD_LOG_ERROR,
- "Attempt to attach to address lookup server rejected (container=%s)",
- (conn->connection_info) ? conn->connection_info->container : "<unknown>");
- return;
- }
-
- _endpoint_ref_t *epr = new__endpoint_ref_t();
- ZERO(epr);
- epr->endpoint = endpoint;
- epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>";
- DEQ_INSERT_TAIL(_server_state.endpoints, epr);
- *link_context = epr;
- qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target);
- qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false);
-
- qd_log(_server_state.core->log, QD_LOG_TRACE,
- "Client attached to address lookup server (container=%s, endpoint=%p)",
- epr->container_id, (void *)endpoint);
-}
-
-
-/* handle incoming detach from client
- */
-static void _on_first_detach(void *link_context,
- qdr_error_t *error)
-{
- _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
- qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0);
- DEQ_REMOVE(_server_state.endpoints, epr);
- qd_log(_server_state.core->log, QD_LOG_TRACE,
- "Client detached from address lookup server (container=%s, endpoint=%p)",
- epr->container_id, (void *)epr->endpoint);
- free__endpoint_ref_t(epr);
-}
-
-
-static qdrc_endpoint_desc_t _endpoint_handlers =
-{
- .label = "address lookup",
- .on_first_attach = _on_first_attach,
- .on_transfer = _on_transfer,
- .on_first_detach = _on_first_detach,
-};
-
-
-static void _address_lookup_init_CT(qdr_core_t *core, void **module_context)
-{
- //
- // Address resolution service is provided by interior routers only
- //
- if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
- return;
-
- _server_state.core = core;
-
- //
- // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address
- //
- qdrc_endpoint_bind_mobile_address_CT(core,
- QD_TERMINUS_ADDRESS_LOOKUP,
- '0', // phase
- &_endpoint_handlers,
- &_server_state);
- *module_context = &_server_state;
-}
-
-
-static void _address_lookup_final_CT(void *module_context)
-{
- _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
- while (epr) {
- DEQ_REMOVE_HEAD(_server_state.endpoints);
- free__endpoint_ref_t(epr);
- epr = DEQ_HEAD(_server_state.endpoints);
- }
-}
-
-
-QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup/address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup/address_lookup.h b/src/router_core/modules/address_lookup/address_lookup.h
deleted file mode 100644
index 9f8b65e..0000000
--- a/src/router_core/modules/address_lookup/address_lookup.h
+++ /dev/null
@@ -1,75 +0,0 @@
-#ifndef router_core_address_lookup_h
-#define router_core_address_lookup_h 1
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qpid/dispatch/iterator.h>
-#include <qpid/dispatch/container.h>
-#include <qpid/dispatch/compose.h>
-//
-// API for building address lookup request messages. The message properties
-// and body fields are handled separately so they can be passed directly to the
-// core client API.
-//
-
-typedef enum {
- // note: keep unit test in sync
- QCM_ADDR_LOOKUP_OK,
- QCM_ADDR_LOOKUP_BAD_VERSION,
- QCM_ADDR_LOOKUP_BAD_OPCODE,
- QCM_ADDR_LOOKUP_NOT_FOUND,
- QCM_ADDR_LOOKUP_INVALID_REQUEST,
-} qcm_address_lookup_status_t;
-
-
-/**
- * Create the message properties and body for a link route address lookup. The
- * returned properties and body can be passed directly to
- * qdrc_client_request_CT().
- *
- * @param address - fully qualified link route address to look up.
- * @param dir - QD_INCOMING or QD_OUTGOING
- * @param properties - return value for message application properties section
- * @param body - return value for message body
- * @return zero on success
- */
-int qcm_link_route_lookup_request(qd_iterator_t *address,
- qd_direction_t dir,
- qd_composed_field_t **properties,
- qd_composed_field_t **body);
-
-
-/**
- * Parse out the payload of the link route lookup reply message. The
- * properties and body fields are provided by the on_reply_cb() callback passed
- * to the qdrc_client_request_CT() call.
- *
- * @param properties - application properties as returned in the reply
- * @param body - body from reply message
- * @param is_link_route - set True if the address is a link route address that
- * exists in the route tables of the queried router.
- * @param has_destination - if is_link_route this indicates whether or not the
- * queried router has active destinations for this link route.
- * @return QCM_ADDR_LOOKUP_OK if query is successful, else and error code
- */
-qcm_address_lookup_status_t qcm_link_route_lookup_decode(qd_iterator_t *properties,
- qd_iterator_t *body,
- bool *is_link_route,
- bool *has_destinations);
-#endif // router_core_address_lookup_h
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d965d9b1/src/router_core/modules/address_lookup_server/address_lookup_server.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_server/address_lookup_server.c b/src/router_core/modules/address_lookup_server/address_lookup_server.c
new file mode 100644
index 0000000..9a05c87
--- /dev/null
+++ b/src/router_core/modules/address_lookup_server/address_lookup_server.c
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/address_lookup_server.h>
+#include <qpid/dispatch/ctools.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+
+#include "inttypes.h"
+
+
+typedef struct _endpoint_ref {
+ DEQ_LINKS(struct _endpoint_ref);
+ qdrc_endpoint_t *endpoint;
+ const char *container_id;
+} _endpoint_ref_t;
+DEQ_DECLARE(_endpoint_ref_t, _endpoint_ref_list_t);
+ALLOC_DEFINE(_endpoint_ref_t);
+
+
+static struct {
+ qdr_core_t *core;
+ _endpoint_ref_list_t endpoints;
+} _server_state;
+
+
+/* parse out the opcode from the request
+ */
+static address_lookup_opcode_t _decode_opcode(qd_parsed_field_t *properties)
+{
+ if (!properties)
+ return OPCODE_INVALID;
+ qd_parsed_field_t *oc = qd_parse_value_by_key(properties, "opcode");
+ if (!oc)
+ return OPCODE_INVALID;
+ uint32_t opcode = qd_parse_as_uint(oc);
+ if (!qd_parse_ok(oc))
+ return OPCODE_INVALID;
+ return (address_lookup_opcode_t)opcode;
+}
+
+
+/* send a reply to a lookup request
+ */
+static uint64_t _send_reply(_endpoint_ref_t *epr,
+ address_lookup_opcode_t opcode,
+ qcm_address_lookup_status_t status,
+ qd_iterator_t *correlation_id,
+ qd_iterator_t *reply_to,
+ qd_composed_field_t *body)
+{
+ if (!correlation_id || !reply_to) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Link route address reply failed - invalid request message properties"
+ " (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ return PN_REJECTED;
+ }
+
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, fld);
+ qd_compose_start_list(fld);
+ qd_compose_insert_null(fld); // message-id
+ qd_compose_insert_null(fld); // user-id
+ qd_compose_insert_typed_iterator(fld, reply_to); // to
+ qd_compose_insert_null(fld); // subject
+ qd_compose_insert_null(fld); // reply-to
+ qd_compose_insert_typed_iterator(fld, correlation_id);
+ qd_compose_end_list(fld);
+
+ fld = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, fld);
+ qd_compose_start_map(fld);
+ qd_compose_insert_string(fld, "version");
+ qd_compose_insert_uint(fld, PROTOCOL_VERSION);
+ qd_compose_insert_string(fld, "opcode");
+ qd_compose_insert_uint(fld, opcode);
+ qd_compose_insert_string(fld, "status");
+ qd_compose_insert_uint(fld, status);
+ qd_compose_end_map(fld);
+
+ qd_message_t *msg = qd_message();
+
+ qd_message_compose_3(msg, fld, body);
+ qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
+ qd_message_free(msg);
+ qd_compose_free(fld);
+
+ return PN_ACCEPTED;
+}
+
+
+/* perform a link route lookup
+ */
+static uint64_t _do_link_route_lookup(_endpoint_ref_t *epr,
+ qd_parsed_field_t *body,
+ qd_iterator_t *reply_to,
+ qd_iterator_t *cid)
+{
+ if (!body || !qd_parse_ok(body) || qd_parse_sub_count(body) < 2) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Link route address lookup failed - invalid request body"
+ " (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ return PN_REJECTED;
+ }
+
+ //
+ // body[0] == fully qualified address (string)
+ // body[1] == direction (bool, true == receiver)
+ //
+
+ qd_iterator_t *addr_i = qd_parse_raw(qd_parse_sub_value(body, 0));
+ qd_direction_t dir = (qd_parse_as_bool(qd_parse_sub_value(body, 1))
+ ? QD_INCOMING : QD_OUTGOING);
+
+ bool is_link_route = false;
+ bool has_destinations = false;
+ qdr_address_t *addr = 0;
+ qd_iterator_reset_view(addr_i, ITER_VIEW_ALL);
+ qd_parse_tree_retrieve_match(_server_state.core->link_route_tree[dir], addr_i, (void**) &addr);
+ if (addr) {
+ is_link_route = true;
+ has_destinations = !!(DEQ_SIZE(addr->conns) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes));
+ }
+
+ // out_body[0] == is_link_route (bool)
+ // out_body[1] == has_destinations (bool)
+
+ qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+ qd_compose_start_list(out_body);
+ qd_compose_insert_bool(out_body, is_link_route);
+ qd_compose_insert_bool(out_body, has_destinations);
+ qd_compose_end_list(out_body);
+
+ uint64_t rc = _send_reply(epr,
+ OPCODE_LINK_ROUTE_LOOKUP,
+ addr ? QCM_ADDR_LOOKUP_OK : QCM_ADDR_LOOKUP_NOT_FOUND,
+ cid,
+ reply_to,
+ out_body);
+ qd_compose_free(out_body);
+
+ if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
+ char *as = (char *)qd_iterator_copy(addr_i);
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Link route address lookup on %s - %sfound is link route=%s has_destinations=%s"
+ " (container=%s, endpoint=%p)",
+ as,
+ (addr) ? "" : "not ",
+ is_link_route ? "yes" : "no",
+ has_destinations ? "yes" : "no",
+ epr->container_id,
+ (void *)epr->endpoint);
+ free(as);
+ }
+ return rc;
+}
+
+
+/* handle lookup request from client
+ */
+void _on_transfer(void *link_context,
+ qdr_delivery_t *delivery,
+ qd_message_t *message)
+{
+ if (!qd_message_receive_complete(message))
+ return;
+
+ _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Address lookup request received (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+
+ uint64_t disposition = PN_ACCEPTED;
+ qd_iterator_t *p_iter = qd_message_field_iterator(message, QD_FIELD_APPLICATION_PROPERTIES);
+ qd_parsed_field_t *props = qd_parse(p_iter);
+ if (!props || !qd_parse_ok(props) || !qd_parse_is_map(props)) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - no properties (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ qd_parsed_field_t *v = qd_parse_value_by_key(props, "version");
+ if (!v) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - no version (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ uint32_t version = qd_parse_as_uint(v);
+ if (!qd_parse_ok(v)) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - invalid version (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ disposition = PN_REJECTED;
+ goto exit;
+ }
+
+ if (version != PROTOCOL_VERSION) {
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - unknown version"
+ " (container=%s, endpoint=%p, version=%"PRIu32")",
+ epr->container_id, (void *)epr->endpoint, version);
+ disposition = PN_REJECTED;
+ goto exit;
+ // @TODO(kgiusti) send reply with status QCM_ADDR_LOOKUP_BAD_VERSION
+ }
+
+ address_lookup_opcode_t opcode = _decode_opcode(props);
+ switch (opcode) {
+ case OPCODE_LINK_ROUTE_LOOKUP: {
+ qd_iterator_t *b_iter = qd_message_field_iterator(message, QD_FIELD_BODY);
+ qd_parsed_field_t *body = qd_parse(b_iter);
+ qd_iterator_t *reply_to = qd_message_field_iterator_typed(message, QD_FIELD_REPLY_TO);
+ qd_iterator_t *cid = qd_message_field_iterator_typed(message, QD_FIELD_CORRELATION_ID);
+ disposition = _do_link_route_lookup(epr, body, reply_to, cid);
+ qd_iterator_free(cid);
+ qd_iterator_free(reply_to);
+ qd_parse_free(body);
+ qd_iterator_free(b_iter);
+ break;
+ }
+ case OPCODE_INVALID:
+ default:
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Invalid address lookup request - invalid opcode"
+ " (container=%s, endpoint=%p, opcode=%d)",
+ epr->container_id, (void *)epr->endpoint, opcode);
+ disposition = PN_REJECTED;
+ }
+
+exit:
+ qd_parse_free(props);
+ qd_iterator_free(p_iter);
+ qdrc_endpoint_settle_CT(_server_state.core, delivery, disposition);
+ qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
+ return;
+}
+
+
+/* handle incoming attach to address lookup service
+ */
+static void _on_first_attach(void *bind_context,
+ qdrc_endpoint_t *endpoint,
+ void **link_context,
+ qdr_terminus_t *remote_source,
+ qdr_terminus_t *remote_target)
+{
+ //
+ // Only accept incoming links initiated by the edge router. Detach all
+ // other links
+ //
+ qdr_connection_t *conn = qdrc_endpoint_get_connection_CT(endpoint);
+ if (qdrc_endpoint_get_direction_CT(endpoint) != QD_INCOMING ||
+ conn->role != QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(_server_state.core, endpoint, 0);
+ qd_log(_server_state.core->log, QD_LOG_ERROR,
+ "Attempt to attach to address lookup server rejected (container=%s)",
+ (conn->connection_info) ? conn->connection_info->container : "<unknown>");
+ return;
+ }
+
+ _endpoint_ref_t *epr = new__endpoint_ref_t();
+ ZERO(epr);
+ epr->endpoint = endpoint;
+ epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>";
+ DEQ_INSERT_TAIL(_server_state.endpoints, epr);
+ *link_context = epr;
+ qdrc_endpoint_second_attach_CT(_server_state.core, endpoint, remote_source, remote_target);
+ qdrc_endpoint_flow_CT(_server_state.core, endpoint, 1, false);
+
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Client attached to address lookup server (container=%s, endpoint=%p)",
+ epr->container_id, (void *)endpoint);
+}
+
+
+/* handle incoming detach from client
+ */
+static void _on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ _endpoint_ref_t *epr = (_endpoint_ref_t *)link_context;
+ qdrc_endpoint_detach_CT(_server_state.core, epr->endpoint, 0);
+ DEQ_REMOVE(_server_state.endpoints, epr);
+ qd_log(_server_state.core->log, QD_LOG_TRACE,
+ "Client detached from address lookup server (container=%s, endpoint=%p)",
+ epr->container_id, (void *)epr->endpoint);
+ free__endpoint_ref_t(epr);
+}
+
+
+static qdrc_endpoint_desc_t _endpoint_handlers =
+{
+ .label = "address lookup",
+ .on_first_attach = _on_first_attach,
+ .on_transfer = _on_transfer,
+ .on_first_detach = _on_first_detach,
+};
+
+
+static void _address_lookup_init_CT(qdr_core_t *core, void **module_context)
+{
+ //
+ // Address resolution service is provided by interior routers only
+ //
+ if (core->router_mode != QD_ROUTER_MODE_INTERIOR)
+ return;
+
+ _server_state.core = core;
+
+ //
+ // Handle any incoming links to the QD_TERMINUS_ADDRESS_LOOKUP address
+ //
+ qdrc_endpoint_bind_mobile_address_CT(core,
+ QD_TERMINUS_ADDRESS_LOOKUP,
+ '0', // phase
+ &_endpoint_handlers,
+ &_server_state);
+ *module_context = &_server_state;
+}
+
+
+static void _address_lookup_final_CT(void *module_context)
+{
+ _endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
+ while (epr) {
+ DEQ_REMOVE_HEAD(_server_state.endpoints);
+ free__endpoint_ref_t(epr);
+ epr = DEQ_HEAD(_server_state.endpoints);
+ }
+}
+
+
+QDR_CORE_MODULE_DECLARE("address lookup", _address_lookup_init_CT, _address_lookup_final_CT)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org