You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/28 18:29:57 UTC
qpid-dispatch git commit: DISPATCH-179 - Added agent support for the
address table. Adopted a function-name suffix of _CT for core-thread
functions.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 f733be6e9 -> 61be97fdf
DISPATCH-179 - Added agent support for the address table.
Adopted a function-name suffix of _CT for core-thread functions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/61be97fd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/61be97fd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/61be97fd
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 61be97fdf7c776e5b9ceefc4f3cae94dc9a12333
Parents: f733be6
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 28 13:28:46 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 28 13:28:46 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 9 +-
python/qpid_dispatch/management/qdrouter.json | 3 +-
src/CMakeLists.txt | 1 +
src/router_core/agent.c | 159 +++++++------
src/router_core/agent_address.c | 258 +++++++++++++++++++++
src/router_core/agent_address.h | 28 +++
src/router_core/route_tables.c | 66 +++---
src/router_core/router_core.c | 2 +-
src/router_core/router_core_private.h | 14 +-
src/router_core/router_core_thread.c | 4 +-
10 files changed, 424 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index d50eb7c..2e11139 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -22,6 +22,8 @@
#include <qpid/dispatch.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
//
// All callbacks in this module shall be invoked on a connection thread from the server thread pool.
@@ -148,9 +150,10 @@ void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t
void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
-qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset, qd_composed_field_t *body);
-void qdr_manage_get_next(qdr_query_t *query);
-void qdr_query_cancel(qdr_query_t *query);
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset,
+ qd_parsed_field_t *attribute_names, qd_composed_field_t *body);
+void qdr_manage_get_next(qdr_core_t *core, qdr_query_t *query);
+void qdr_query_cancel(qdr_core_t *core, qdr_query_t *query);
typedef void (*qdr_manage_response_t) (void *context, const qd_amqp_error_t *status, bool more);
void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e447f19..00e247f 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -857,7 +857,8 @@
"key": {
"description": "Internal unique (to this router) key to identify the address",
"type": "string"
- }
+ },
+ "hostRouters": {"type": "list", "description": "List of remote routers on which there is a destination for this address."}
}
},
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index eee2b10..42c9fc9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
router_agent.c
router_config.c
router_core/agent.c
+ router_core/agent_address.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 2787ae5..a42232b 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -18,10 +18,53 @@
*/
#include <qpid/dispatch/amqp.h>
-#include "router_core_private.h"
+#include "agent_address.h"
#include <stdio.h>
-static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard);
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+static void qdr_agent_response_handler(void *context)
+{
+ qdr_core_t *core = (qdr_core_t*) context;
+ qdr_query_t *query;
+ bool done = false;
+
+ while (!done) {
+ sys_mutex_lock(core->query_lock);
+ query = DEQ_HEAD(core->outgoing_query_list);
+ if (query)
+ DEQ_REMOVE_HEAD(core->outgoing_query_list);
+ done = DEQ_SIZE(core->outgoing_query_list) == 0;
+ sys_mutex_unlock(core->query_lock);
+
+ if (query) {
+ core->agent_response_handler(query->context, query->status, query->more);
+ if (!query->more) {
+ if (query->next_key)
+ qdr_field_free(query->next_key);
+ free_qdr_query_t(query);
+ }
+ }
+ }
+}
+
+
+void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
+{
+ sys_mutex_lock(core->query_lock);
+ DEQ_INSERT_TAIL(core->outgoing_query_list, query);
+ bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
+ sys_mutex_unlock(core->query_lock);
+
+ if (notify)
+ qd_timer_schedule(core->agent_timer, 0);
+}
+
+static void qdrh_manage_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_manage_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
//==================================================================================
// Interface Functions
@@ -43,18 +86,28 @@ void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t ty
qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type,
- int offset, qd_composed_field_t *body)
+ int offset, qd_parsed_field_t *attribute_names, qd_composed_field_t *body)
{
- qdr_action_t *action = qdr_action(qdrh_manage_get_first);
+ qdr_action_t *action = qdr_action(qdrh_manage_get_first_CT);
qdr_query_t *query = new_qdr_query_t();
query->entity_type = type;
query->context = context;
query->body = body;
query->next_key = 0;
+ query->next_offset = 0;
query->more = false;
query->status = 0;
+ switch (query->entity_type) {
+ case QD_ROUTER_CONNECTION: break;
+ case QD_ROUTER_LINK: break;
+ case QD_ROUTER_ADDRESS: qdra_address_set_columns(query, attribute_names);
+ case QD_ROUTER_WAYPOINT: break;
+ case QD_ROUTER_EXCHANGE: break;
+ case QD_ROUTER_BINDING: break;
+ }
+
action->args.agent.query = query;
action->args.agent.offset = offset;
@@ -64,12 +117,15 @@ qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_ent
}
-void qdr_manage_get_next(qdr_query_t *query)
+void qdr_manage_get_next(qdr_core_t *core, qdr_query_t *query)
{
+ qdr_action_t *action = qdr_action(qdrh_manage_get_next_CT);
+ action->args.agent.query = query;
+ qdr_action_enqueue(core, action);
}
-void qdr_query_cancel(qdr_query_t *query)
+void qdr_query_cancel(qdr_core_t *core, qdr_query_t *query)
{
}
@@ -81,63 +137,10 @@ void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler
//==================================================================================
-// Internal Functions
-//==================================================================================
-
-static void qdr_agent_response_handler(void *context)
-{
- qdr_core_t *core = (qdr_core_t*) context;
- qdr_query_t *query;
- bool done = false;
-
- while (!done) {
- sys_mutex_lock(core->query_lock);
- query = DEQ_HEAD(core->outgoing_query_list);
- if (query)
- DEQ_REMOVE_HEAD(core->outgoing_query_list);
- done = DEQ_SIZE(core->outgoing_query_list) == 0;
- sys_mutex_unlock(core->query_lock);
-
- if (query) {
- core->agent_response_handler(query->context, query->status, query->more);
- if (!query->more) {
- if (query->next_key)
- qdr_field_free(query->next_key);
- free_qdr_query_t(query);
- }
- }
- }
-}
-
-
-static void qdr_agent_enqueue_response(qdr_core_t *core, qdr_query_t *query)
-{
- sys_mutex_lock(core->query_lock);
- DEQ_INSERT_TAIL(core->outgoing_query_list, query);
- bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
- sys_mutex_unlock(core->query_lock);
-
- if (notify)
- qd_timer_schedule(core->agent_timer, 0);
-}
-
-
-static void qdr_manage_get_first_address(qdr_core_t *core, qdr_query_t *query, int offset)
-{
- if (offset >= DEQ_SIZE(core->addrs)) {
- query->more = false;
- query->status = &QD_AMQP_OK;
- qdr_agent_enqueue_response(core, query);
- return;
- }
-}
-
-
-//==================================================================================
// In-Thread Functions
//==================================================================================
-void qdr_agent_setup(qdr_core_t *core)
+void qdr_agent_setup_CT(qdr_core_t *core)
{
DEQ_INIT(core->outgoing_query_list);
core->query_lock = sys_mutex();
@@ -145,31 +148,35 @@ void qdr_agent_setup(qdr_core_t *core)
}
-static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_manage_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_query_t *query = action->args.agent.query;
int offset = action->args.agent.offset;
if (!discard)
switch (query->entity_type) {
- case QD_ROUTER_CONNECTION :
- break;
-
- case QD_ROUTER_LINK :
- break;
-
- case QD_ROUTER_ADDRESS :
- qdr_manage_get_first_address(core, query, offset);
- break;
+ case QD_ROUTER_CONNECTION: break;
+ case QD_ROUTER_LINK: break;
+ case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break;
+ case QD_ROUTER_WAYPOINT: break;
+ case QD_ROUTER_EXCHANGE: break;
+ case QD_ROUTER_BINDING: break;
+ }
+}
- case QD_ROUTER_WAYPOINT :
- break;
- case QD_ROUTER_EXCHANGE :
- break;
+static void qdrh_manage_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_query_t *query = action->args.agent.query;
- case QD_ROUTER_BINDING :
- break;
+ if (!discard)
+ switch (query->entity_type) {
+ case QD_ROUTER_CONNECTION: break;
+ case QD_ROUTER_LINK: break;
+ case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break;
+ case QD_ROUTER_WAYPOINT: break;
+ case QD_ROUTER_EXCHANGE: break;
+ case QD_ROUTER_BINDING: break;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
new file mode 100644
index 0000000..5ede5d1
--- /dev/null
+++ b/src/router_core/agent_address.c
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "agent_address.h"
+
+static const char *qdr_address_columns[] =
+ {"name",
+ "identity",
+ "type",
+ "key",
+ "inProcess",
+ "subscriberCount",
+ "remoteCount",
+ "hostRouters",
+ "deliveriesIngress",
+ "deliveriesEgress",
+ "deliveriesTransit",
+ "deliveriesToContainer",
+ "deliveriesFromContainer",
+ 0};
+
+#define QDR_ADDRESS_NAME 0
+#define QDR_ADDRESS_IDENTITY 1
+#define QDR_ADDRESS_TYPE 2
+#define QDR_ADDRESS_KEY 3
+#define QDR_ADDRESS_IN_PROCESS 4
+#define QDR_ADDRESS_SUBSCRIBER_COUNT 5
+#define QDR_ADDRESS_REMOTE_COUNT 6
+#define QDR_ADDRESS_HOST_ROUTERS 7
+#define QDR_ADDRESS_DELIVERIES_INGRESS 8
+#define QDR_ADDRESS_DELIVERIES_EGRESS 9
+#define QDR_ADDRESS_DELIVERIES_TRANSIT 10
+#define QDR_ADDRESS_DELIVERIES_TO_CONTAINER 11
+#define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 12
+#define QDR_ADDRESS_COLUMN_COUNT 13
+
+static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr)
+{
+ qd_composed_field_t *body = query->body;
+
+ qd_compose_start_list(body);
+ int i = 0;
+ while (query->columns[i] >= 0) {
+ switch(query->columns[i]) {
+ case QDR_ADDRESS_NAME:
+ case QDR_ADDRESS_IDENTITY:
+ break;
+
+ case QDR_ADDRESS_TYPE:
+ qd_compose_insert_string(body, "org.apache.qpid.dispatch.router.address");
+ break;
+
+ case QDR_ADDRESS_KEY:
+ if (addr->hash_handle)
+ qd_compose_insert_string(body, (const char*) qd_hash_key_by_handle(addr->hash_handle));
+ else
+ qd_compose_insert_null(body);
+ break;
+
+ case QDR_ADDRESS_IN_PROCESS:
+ qd_compose_insert_bool(body, addr->on_message != 0);
+ break;
+
+ case QDR_ADDRESS_SUBSCRIBER_COUNT:
+ qd_compose_insert_uint(body, DEQ_SIZE(addr->rlinks));
+ break;
+
+ case QDR_ADDRESS_REMOTE_COUNT:
+ qd_compose_insert_uint(body, DEQ_SIZE(addr->rnodes));
+ break;
+
+ case QDR_ADDRESS_HOST_ROUTERS:
+ qd_compose_insert_null(body); // TEMP
+ break;
+
+ case QDR_ADDRESS_DELIVERIES_INGRESS:
+ qd_compose_insert_ulong(body, addr->deliveries_ingress);
+ break;
+
+ case QDR_ADDRESS_DELIVERIES_EGRESS:
+ qd_compose_insert_ulong(body, addr->deliveries_egress);
+ break;
+
+ case QDR_ADDRESS_DELIVERIES_TRANSIT:
+ qd_compose_insert_ulong(body, addr->deliveries_transit);
+ break;
+
+ case QDR_ADDRESS_DELIVERIES_TO_CONTAINER:
+ qd_compose_insert_ulong(body, addr->deliveries_to_container);
+ break;
+
+ case QDR_ADDRESS_DELIVERIES_FROM_CONTAINER:
+ qd_compose_insert_ulong(body, addr->deliveries_from_container);
+ break;
+
+ default:
+ qd_compose_insert_null(body);
+ break;
+ }
+ }
+ qd_compose_end_list(body);
+}
+
+
+static void qdr_manage_advance_address_CT(qdr_query_t *query, qdr_address_t *addr)
+{
+ query->next_offset++;
+ addr = DEQ_NEXT(addr);
+ if (addr) {
+ query->more = true;
+ query->next_key = qdr_field((const char*) qd_hash_key_by_handle(addr->hash_handle));
+ } else
+ query->more = false;
+}
+
+
+void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names)
+{
+ if (!attribute_names ||
+ (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 &&
+ qd_parse_tag(attribute_names) != QD_AMQP_LIST32) ||
+ qd_parse_sub_count(attribute_names) == 0) {
+ //
+ // Either the attribute_names field is absent, it's not a list, or it's an empty list.
+ // In this case, we will include all available attributes.
+ //
+ int i;
+ for (i = 0; i < QDR_ADDRESS_COLUMN_COUNT; i++)
+ query->columns[i] = i;
+ query->columns[i] = -1;
+ return;
+ }
+
+ //
+ // We have a valid, non-empty attribute list. Set the columns appropriately.
+ //
+ uint32_t count = qd_parse_sub_count(attribute_names);
+ uint32_t idx;
+
+ for (idx = 0; idx < count; idx++) {
+ qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx);
+ if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8))
+ query->columns[idx] = QDR_AGENT_COLUMN_NULL;
+ else {
+ int j = 0;
+ while (qdr_address_columns[j]) {
+ qd_field_iterator_t *iter = qd_parse_raw(name);
+ if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_address_columns[j])) {
+ query->columns[idx] = j;
+ break;
+ }
+ }
+ }
+ }
+}
+
+
+void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+ //
+ // Queries that get this far will always succeed.
+ //
+ query->status = &QD_AMQP_OK;
+
+ //
+ // If the offset goes beyond the set of addresses, end the query now.
+ //
+ if (offset >= DEQ_SIZE(core->addrs)) {
+ query->more = false;
+ qdr_agent_enqueue_response_CT(core, query);
+ return;
+ }
+
+ //
+ // Run to the address at the offset.
+ //
+ qdr_address_t *addr = DEQ_HEAD(core->addrs);
+ for (int i = 0; i < offset && addr; i++)
+ addr = DEQ_NEXT(addr);
+ assert(addr != 0);
+
+ //
+ // Write the columns of the address entity into the response body.
+ //
+ qdr_manage_write_address_CT(query, addr);
+
+ //
+ // Advance to the next address
+ //
+ query->next_offset = offset;
+ qdr_manage_advance_address_CT(query, addr);
+
+ //
+ // Enqueue the response.
+ //
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+ qdr_address_t *addr = 0;
+
+ //
+ // Use the stored key to try to find the next entry in the table.
+ //
+ if (query->next_key) {
+ qd_hash_retrieve(core->addr_hash, query->next_key->iterator, (void**) &addr);
+ qdr_field_free(query->next_key);
+ query->next_key = 0;
+ }
+ if (!addr) {
+ //
+ // If the address was removed in the time between this get and the previous one,
+ // we need to use the saved offset, which is less efficient.
+ //
+ if (query->next_offset < DEQ_SIZE(core->addrs)) {
+ addr = DEQ_HEAD(core->addrs);
+ for (int i = 0; i < query->next_offset && addr; i++)
+ addr = DEQ_NEXT(addr);
+ }
+ }
+
+ if (addr) {
+ //
+ // Write the columns of the address entity into the response body.
+ //
+ qdr_manage_write_address_CT(query, addr);
+
+ //
+ // Advance to the next address
+ //
+ qdr_manage_advance_address_CT(query, addr);
+ } else
+ query->more = false;
+
+ //
+ // Enqueue the response.
+ //
+ qdr_agent_enqueue_response_CT(core, query);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h
new file mode 100644
index 0000000..9d6c078
--- /dev/null
+++ b/src/router_core/agent_address.h
@@ -0,0 +1,28 @@
+#ifndef qdr_agent_address
+#define qdr_agent_address 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names);
+void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset);
+void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index c4deb7f..b71564e 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,15 +20,15 @@
#include "router_core_private.h"
#include <stdio.h>
-static void qdrh_add_router (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_del_router (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_link (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_link (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_next_hop (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_next_hop (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_map_destination (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_add_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_del_router_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_remove_link_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
@@ -39,7 +39,7 @@ static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS
void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_add_router);
+ qdr_action_t *action = qdr_action(qdrh_add_router_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
qdr_action_enqueue(core, action);
@@ -48,7 +48,7 @@ void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskb
void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_del_router);
+ qdr_action_t *action = qdr_action(qdrh_del_router_CT);
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -56,7 +56,7 @@ void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_link);
+ qdr_action_t *action = qdr_action(qdrh_set_link_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.link_maskbit = link_maskbit;
qdr_action_enqueue(core, action);
@@ -65,7 +65,7 @@ void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_link);
+ qdr_action_t *action = qdr_action(qdrh_remove_link_CT);
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -73,7 +73,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_set_next_hop);
+ qdr_action_t *action = qdr_action(qdrh_set_next_hop_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.nh_router_maskbit = nh_router_maskbit;
qdr_action_enqueue(core, action);
@@ -82,7 +82,7 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m
void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
{
- qdr_action_t *action = qdr_action(qdrh_remove_next_hop);
+ qdr_action_t *action = qdr_action(qdrh_remove_next_hop_CT);
action->args.route_table.router_maskbit = router_maskbit;
qdr_action_enqueue(core, action);
}
@@ -90,7 +90,7 @@ void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers)
{
- qdr_action_t *action = qdr_action(qdrh_set_valid_origins);
+ qdr_action_t *action = qdr_action(qdrh_set_valid_origins_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.router_set = routers;
qdr_action_enqueue(core, action);
@@ -99,7 +99,7 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask
void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
{
- qdr_action_t *action = qdr_action(qdrh_map_destination);
+ qdr_action_t *action = qdr_action(qdrh_map_destination_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
action->args.route_table.address_phase = phase;
@@ -111,7 +111,7 @@ void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *
void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
{
- qdr_action_t *action = qdr_action(qdrh_unmap_destination);
+ qdr_action_t *action = qdr_action(qdrh_unmap_destination_CT);
action->args.route_table.router_maskbit = router_maskbit;
action->args.route_table.address = qdr_field(address);
action->args.route_table.address_phase = phase;
@@ -136,16 +136,16 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
// In-Thread Functions
//==================================================================================
-void qdr_route_table_setup(qdr_core_t *core)
+void qdr_route_table_setup_CT(qdr_core_t *core)
{
DEQ_INIT(core->addrs);
DEQ_INIT(core->links);
DEQ_INIT(core->routers);
core->addr_hash = qd_hash(10, 32, 0);
- core->router_addr = qdr_add_local_address(core, "qdrouter", QD_SEMANTICS_ROUTER_CONTROL);
- core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
- core->hello_addr = qdr_add_local_address(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL);
+ core->router_addr = qdr_add_local_address_CT(core, "qdrouter", QD_SEMANTICS_ROUTER_CONTROL);
+ core->routerma_addr = qdr_add_local_address_CT(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
+ core->hello_addr = qdr_add_local_address_CT(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL);
core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
core->out_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
@@ -161,7 +161,7 @@ void qdr_route_table_setup(qdr_core_t *core)
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-static void qdr_check_addr(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
{
if (addr == 0)
return;
@@ -214,7 +214,7 @@ static void qdr_check_addr(qdr_core_t *core, qdr_address_t *addr, bool was_local
}
-static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -294,7 +294,7 @@ static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action, bool discard
}
-static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -343,7 +343,7 @@ static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action, bool discard
}
-static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int link_maskbit = action->args.route_table.link_maskbit;
@@ -376,7 +376,7 @@ static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action, bool discard)
}
-static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -395,7 +395,7 @@ static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action, bool discar
}
-static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
@@ -427,7 +427,7 @@ static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action, bool disca
}
-static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -441,7 +441,7 @@ static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action, bool di
}
-static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qd_bitmask_t *valid_origins = action->args.route_table.router_set;
@@ -474,7 +474,7 @@ static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool
}
-static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
//
// TODO - handle the class-prefix and phase explicitly
@@ -522,7 +522,7 @@ static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action, bool di
}
-static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -560,7 +560,7 @@ static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool
// TODO - If this affects a waypoint, create the proper side effects
//
- qdr_check_addr(core, addr, false);
+ qdr_check_addr_CT(core, addr, false);
} while (false);
qdr_field_free(address);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 99edccc..b7f24e3 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -145,7 +145,7 @@ qdr_address_t *qdr_address(qd_address_semantics_t semantics)
}
-qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
{
char addr_string[1000];
qdr_address_t *addr = 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index de5566b..5dd1d84 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -60,7 +60,7 @@ struct qdr_action_t {
} route_table;
struct {
qdr_query_t *query;
- int offset;
+ int offset;
} agent;
} args;
};
@@ -68,12 +68,17 @@ struct qdr_action_t {
ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
+#define QDR_AGENT_MAX_COLUMNS 64
+#define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1)
+
struct qdr_query_t {
DEQ_LINKS(qdr_query_t);
qd_router_entity_type_t entity_type;
void *context;
+ int columns[QDR_AGENT_MAX_COLUMNS];
qd_composed_field_t *body;
qdr_field_t *next_key;
+ int next_offset;
bool more;
const qd_amqp_error_t *status;
};
@@ -189,7 +194,7 @@ ALLOC_DECLARE(qdr_address_t);
DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
qdr_address_t *qdr_address(qd_address_semantics_t semantics);
-qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
@@ -239,9 +244,10 @@ struct qdr_core_t {
};
void *router_core_thread(void *arg);
-void qdr_route_table_setup(qdr_core_t *core);
-void qdr_agent_setup(qdr_core_t *core);
+void qdr_route_table_setup_CT(qdr_core_t *core);
+void qdr_agent_setup_CT(qdr_core_t *core);
qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
+void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 7ef11ba..4212664 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -36,8 +36,8 @@ void *router_core_thread(void *arg)
qdr_action_list_t action_list;
qdr_action_t *action;
- qdr_route_table_setup(core);
- qdr_agent_setup(core);
+ qdr_route_table_setup_CT(core);
+ qdr_agent_setup_CT(core);
qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
while (core->running) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org