You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/28 18:29:57 UTC

qpid-dispatch git commit: DISPATCH-179 - Added agent support for the address table. Adopted a function-name suffix of _CT for core-thread functions.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 f733be6e9 -> 61be97fdf


DISPATCH-179 - Added agent support for the address table.
               Adopted a function-name suffix of _CT for core-thread functions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/61be97fd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/61be97fd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/61be97fd

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 61be97fdf7c776e5b9ceefc4f3cae94dc9a12333
Parents: f733be6
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 28 13:28:46 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 28 13:28:46 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h           |   9 +-
 python/qpid_dispatch/management/qdrouter.json |   3 +-
 src/CMakeLists.txt                            |   1 +
 src/router_core/agent.c                       | 159 +++++++------
 src/router_core/agent_address.c               | 258 +++++++++++++++++++++
 src/router_core/agent_address.h               |  28 +++
 src/router_core/route_tables.c                |  66 +++---
 src/router_core/router_core.c                 |   2 +-
 src/router_core/router_core_private.h         |  14 +-
 src/router_core/router_core_thread.c          |   4 +-
 10 files changed, 424 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index d50eb7c..2e11139 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -22,6 +22,8 @@
 #include <qpid/dispatch.h>
 #include <qpid/dispatch/amqp.h>
 #include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
 
 //
 // All callbacks in this module shall be invoked on a connection thread from the server thread pool.
@@ -148,9 +150,10 @@ void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t
 void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
 void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
 
-qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset, qd_composed_field_t *body);
-void qdr_manage_get_next(qdr_query_t *query);
-void qdr_query_cancel(qdr_query_t *query);
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset,
+                                  qd_parsed_field_t *attribute_names, qd_composed_field_t *body);
+void qdr_manage_get_next(qdr_core_t *core, qdr_query_t *query);
+void qdr_query_cancel(qdr_core_t *core, qdr_query_t *query);
 
 typedef void (*qdr_manage_response_t) (void *context, const qd_amqp_error_t *status, bool more);
 void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index e447f19..00e247f 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -857,7 +857,8 @@
                 "key": {
                     "description": "Internal unique (to this router) key to identify the address",
                     "type": "string"
-                }
+                },
+                "hostRouters": {"type": "list", "description": "List of remote routers on which there is a destination for this address."}
             }
         },
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index eee2b10..42c9fc9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
   router_agent.c
   router_config.c
   router_core/agent.c
+  router_core/agent_address.c
   router_core/router_core.c
   router_core/router_core_thread.c
   router_core/route_tables.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 2787ae5..a42232b 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -18,10 +18,53 @@
  */
 
 #include <qpid/dispatch/amqp.h>
-#include "router_core_private.h"
+#include "agent_address.h"
 #include <stdio.h>
 
-static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard);
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+static void qdr_agent_response_handler(void *context)
+{
+    qdr_core_t  *core = (qdr_core_t*) context;
+    qdr_query_t *query;
+    bool         done = false;
+
+    while (!done) {
+        sys_mutex_lock(core->query_lock);
+        query = DEQ_HEAD(core->outgoing_query_list);
+        if (query)
+            DEQ_REMOVE_HEAD(core->outgoing_query_list);
+        done = DEQ_SIZE(core->outgoing_query_list) == 0;
+        sys_mutex_unlock(core->query_lock);
+
+        if (query) {
+            core->agent_response_handler(query->context, query->status, query->more);
+            if (!query->more) {
+                if (query->next_key)
+                    qdr_field_free(query->next_key);
+                free_qdr_query_t(query);
+            }
+        }
+    }
+}
+
+
+void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    sys_mutex_lock(core->query_lock);
+    DEQ_INSERT_TAIL(core->outgoing_query_list, query);
+    bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
+    sys_mutex_unlock(core->query_lock);
+
+    if (notify)
+        qd_timer_schedule(core->agent_timer, 0);
+}
+
+static void qdrh_manage_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_manage_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
 
 //==================================================================================
 // Interface Functions
@@ -43,18 +86,28 @@ void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t ty
 
 
 qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type,
-                                  int offset, qd_composed_field_t *body)
+                                  int offset, qd_parsed_field_t *attribute_names, qd_composed_field_t *body)
 {
-    qdr_action_t *action = qdr_action(qdrh_manage_get_first);
+    qdr_action_t *action = qdr_action(qdrh_manage_get_first_CT);
     qdr_query_t  *query  = new_qdr_query_t();
 
     query->entity_type = type;
     query->context     = context;
     query->body        = body;
     query->next_key    = 0;
+    query->next_offset = 0;
     query->more        = false;
     query->status      = 0;
 
+    switch (query->entity_type) {
+    case QD_ROUTER_CONNECTION: break;
+    case QD_ROUTER_LINK:       break;
+    case QD_ROUTER_ADDRESS:    qdra_address_set_columns(query, attribute_names);
+    case QD_ROUTER_WAYPOINT:   break;
+    case QD_ROUTER_EXCHANGE:   break;
+    case QD_ROUTER_BINDING:    break;
+    }
+
     action->args.agent.query  = query;
     action->args.agent.offset = offset;
 
@@ -64,12 +117,15 @@ qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_ent
 }
 
 
-void qdr_manage_get_next(qdr_query_t *query)
+void qdr_manage_get_next(qdr_core_t *core, qdr_query_t *query)
 {
+    qdr_action_t *action = qdr_action(qdrh_manage_get_next_CT);
+    action->args.agent.query = query;
+    qdr_action_enqueue(core, action);
 }
 
 
-void qdr_query_cancel(qdr_query_t *query)
+void qdr_query_cancel(qdr_core_t *core, qdr_query_t *query)
 {
 }
 
@@ -81,63 +137,10 @@ void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler
 
 
 //==================================================================================
-// Internal Functions
-//==================================================================================
-
-static void qdr_agent_response_handler(void *context)
-{
-    qdr_core_t  *core = (qdr_core_t*) context;
-    qdr_query_t *query;
-    bool         done = false;
-
-    while (!done) {
-        sys_mutex_lock(core->query_lock);
-        query = DEQ_HEAD(core->outgoing_query_list);
-        if (query)
-            DEQ_REMOVE_HEAD(core->outgoing_query_list);
-        done = DEQ_SIZE(core->outgoing_query_list) == 0;
-        sys_mutex_unlock(core->query_lock);
-
-        if (query) {
-            core->agent_response_handler(query->context, query->status, query->more);
-            if (!query->more) {
-                if (query->next_key)
-                    qdr_field_free(query->next_key);
-                free_qdr_query_t(query);
-            }
-        }
-    }
-}
-
-
-static void qdr_agent_enqueue_response(qdr_core_t *core, qdr_query_t *query)
-{
-    sys_mutex_lock(core->query_lock);
-    DEQ_INSERT_TAIL(core->outgoing_query_list, query);
-    bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
-    sys_mutex_unlock(core->query_lock);
-
-    if (notify)
-        qd_timer_schedule(core->agent_timer, 0);
-}
-
-
-static void qdr_manage_get_first_address(qdr_core_t *core, qdr_query_t *query, int offset)
-{
-    if (offset >= DEQ_SIZE(core->addrs)) {
-        query->more        = false;
-        query->status      = &QD_AMQP_OK;
-        qdr_agent_enqueue_response(core, query);
-        return;
-    }
-}
-
-
-//==================================================================================
 // In-Thread Functions
 //==================================================================================
 
-void qdr_agent_setup(qdr_core_t *core)
+void qdr_agent_setup_CT(qdr_core_t *core)
 {
     DEQ_INIT(core->outgoing_query_list);
     core->query_lock  = sys_mutex();
@@ -145,31 +148,35 @@ void qdr_agent_setup(qdr_core_t *core)
 }
 
 
-static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_manage_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     qdr_query_t *query  = action->args.agent.query;
     int          offset = action->args.agent.offset;
 
     if (!discard)
         switch (query->entity_type) {
-        case QD_ROUTER_CONNECTION :
-            break;
-
-        case QD_ROUTER_LINK :
-            break;
-
-        case QD_ROUTER_ADDRESS :
-            qdr_manage_get_first_address(core, query, offset);
-            break;
+        case QD_ROUTER_CONNECTION: break;
+        case QD_ROUTER_LINK:       break;
+        case QD_ROUTER_ADDRESS:    qdra_address_get_first_CT(core, query, offset); break;
+        case QD_ROUTER_WAYPOINT:   break;
+        case QD_ROUTER_EXCHANGE:   break;
+        case QD_ROUTER_BINDING:    break;
+        }
+}
 
-        case QD_ROUTER_WAYPOINT :
-            break;
 
-        case QD_ROUTER_EXCHANGE :
-            break;
+static void qdrh_manage_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_query_t *query  = action->args.agent.query;
 
-        case QD_ROUTER_BINDING :
-            break;
+    if (!discard)
+        switch (query->entity_type) {
+        case QD_ROUTER_CONNECTION: break;
+        case QD_ROUTER_LINK:       break;
+        case QD_ROUTER_ADDRESS:    qdra_address_get_next_CT(core, query); break;
+        case QD_ROUTER_WAYPOINT:   break;
+        case QD_ROUTER_EXCHANGE:   break;
+        case QD_ROUTER_BINDING:    break;
         }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
new file mode 100644
index 0000000..5ede5d1
--- /dev/null
+++ b/src/router_core/agent_address.c
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "agent_address.h"
+
+static const char *qdr_address_columns[] =
+    {"name",
+     "identity",
+     "type",
+     "key",
+     "inProcess",
+     "subscriberCount",
+     "remoteCount",
+     "hostRouters",
+     "deliveriesIngress",
+     "deliveriesEgress",
+     "deliveriesTransit",
+     "deliveriesToContainer",
+     "deliveriesFromContainer",
+     0};
+
+#define QDR_ADDRESS_NAME                      0
+#define QDR_ADDRESS_IDENTITY                  1
+#define QDR_ADDRESS_TYPE                      2
+#define QDR_ADDRESS_KEY                       3
+#define QDR_ADDRESS_IN_PROCESS                4
+#define QDR_ADDRESS_SUBSCRIBER_COUNT          5
+#define QDR_ADDRESS_REMOTE_COUNT              6
+#define QDR_ADDRESS_HOST_ROUTERS              7
+#define QDR_ADDRESS_DELIVERIES_INGRESS        8
+#define QDR_ADDRESS_DELIVERIES_EGRESS         9
+#define QDR_ADDRESS_DELIVERIES_TRANSIT        10
+#define QDR_ADDRESS_DELIVERIES_TO_CONTAINER   11
+#define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 12
+#define QDR_ADDRESS_COLUMN_COUNT              13
+
+static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr)
+{
+    qd_composed_field_t *body = query->body;
+
+    qd_compose_start_list(body);
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        switch(query->columns[i]) {
+        case QDR_ADDRESS_NAME:
+        case QDR_ADDRESS_IDENTITY:
+            break;
+
+        case QDR_ADDRESS_TYPE:
+            qd_compose_insert_string(body, "org.apache.qpid.dispatch.router.address");
+            break;
+
+        case QDR_ADDRESS_KEY:
+            if (addr->hash_handle)
+                qd_compose_insert_string(body, (const char*) qd_hash_key_by_handle(addr->hash_handle));
+            else
+                qd_compose_insert_null(body);
+            break;
+
+        case QDR_ADDRESS_IN_PROCESS:
+            qd_compose_insert_bool(body, addr->on_message != 0);
+            break;
+
+        case QDR_ADDRESS_SUBSCRIBER_COUNT:
+            qd_compose_insert_uint(body, DEQ_SIZE(addr->rlinks));
+            break;
+
+        case QDR_ADDRESS_REMOTE_COUNT:
+            qd_compose_insert_uint(body, DEQ_SIZE(addr->rnodes));
+            break;
+
+        case QDR_ADDRESS_HOST_ROUTERS:
+            qd_compose_insert_null(body);  // TEMP
+            break;
+
+        case QDR_ADDRESS_DELIVERIES_INGRESS:
+            qd_compose_insert_ulong(body, addr->deliveries_ingress);
+            break;
+
+        case QDR_ADDRESS_DELIVERIES_EGRESS:
+            qd_compose_insert_ulong(body, addr->deliveries_egress);
+            break;
+
+        case QDR_ADDRESS_DELIVERIES_TRANSIT:
+            qd_compose_insert_ulong(body, addr->deliveries_transit);
+            break;
+
+        case QDR_ADDRESS_DELIVERIES_TO_CONTAINER:
+            qd_compose_insert_ulong(body, addr->deliveries_to_container);
+            break;
+
+        case QDR_ADDRESS_DELIVERIES_FROM_CONTAINER:
+            qd_compose_insert_ulong(body, addr->deliveries_from_container);
+            break;
+
+        default:
+            qd_compose_insert_null(body);
+            break;
+        }
+    }
+    qd_compose_end_list(body);
+}
+
+
+static void qdr_manage_advance_address_CT(qdr_query_t *query, qdr_address_t *addr)
+{
+    query->next_offset++;
+    addr = DEQ_NEXT(addr);
+    if (addr) {
+        query->more     = true;
+        query->next_key = qdr_field((const char*) qd_hash_key_by_handle(addr->hash_handle));
+    } else
+        query->more = false;
+}
+
+
+void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names)
+{
+    if (!attribute_names ||
+        (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 &&
+         qd_parse_tag(attribute_names) != QD_AMQP_LIST32) ||
+        qd_parse_sub_count(attribute_names) == 0) {
+        //
+        // Either the attribute_names field is absent, it's not a list, or it's an empty list.
+        // In this case, we will include all available attributes.
+        //
+        int i;
+        for (i = 0; i < QDR_ADDRESS_COLUMN_COUNT; i++)
+            query->columns[i] = i;
+        query->columns[i] = -1;
+        return;
+    }
+
+    //
+    // We have a valid, non-empty attribute list.  Set the columns appropriately.
+    //
+    uint32_t count = qd_parse_sub_count(attribute_names);
+    uint32_t idx;
+
+    for (idx = 0; idx < count; idx++) {
+        qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx);
+        if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8))
+            query->columns[idx] = QDR_AGENT_COLUMN_NULL;
+        else {
+            int j = 0;
+            while (qdr_address_columns[j]) {
+                qd_field_iterator_t *iter = qd_parse_raw(name);
+                if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_address_columns[j])) {
+                    query->columns[idx] = j;
+                    break;
+                }
+            }
+        }
+    }
+}
+
+
+void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+    //
+    // Queries that get this far will always succeed.
+    //
+    query->status = &QD_AMQP_OK;
+
+    //
+    // If the offset goes beyond the set of addresses, end the query now.
+    //
+    if (offset >= DEQ_SIZE(core->addrs)) {
+        query->more = false;
+        qdr_agent_enqueue_response_CT(core, query);
+        return;
+    }
+
+    //
+    // Run to the address at the offset.
+    //
+    qdr_address_t *addr = DEQ_HEAD(core->addrs);
+    for (int i = 0; i < offset && addr; i++)
+        addr = DEQ_NEXT(addr);
+    assert(addr != 0);
+
+    //
+    // Write the columns of the address entity into the response body.
+    //
+    qdr_manage_write_address_CT(query, addr);
+
+    //
+    // Advance to the next address
+    //
+    query->next_offset = offset;
+    qdr_manage_advance_address_CT(query, addr);
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+}
+
+
+void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+{
+    qdr_address_t *addr = 0;
+
+    //
+    // Use the stored key to try to find the next entry in the table.
+    //
+    if (query->next_key) {
+        qd_hash_retrieve(core->addr_hash, query->next_key->iterator, (void**) &addr);
+        qdr_field_free(query->next_key);
+        query->next_key = 0;
+    }
+    if (!addr) {
+        //
+        // If the address was removed in the time between this get and the previous one,
+        // we need to use the saved offset, which is less efficient.
+        //
+        if (query->next_offset < DEQ_SIZE(core->addrs)) {
+            addr = DEQ_HEAD(core->addrs);
+            for (int i = 0; i < query->next_offset && addr; i++)
+                addr = DEQ_NEXT(addr);
+        }
+    }
+
+    if (addr) {
+        //
+        // Write the columns of the address entity into the response body.
+        //
+        qdr_manage_write_address_CT(query, addr);
+
+        //
+        // Advance to the next address
+        //
+        qdr_manage_advance_address_CT(query, addr);
+    } else
+        query->more = false;
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/agent_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h
new file mode 100644
index 0000000..9d6c078
--- /dev/null
+++ b/src/router_core/agent_address.h
@@ -0,0 +1,28 @@
+#ifndef qdr_agent_address
+#define qdr_agent_address 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+void qdra_address_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names);
+void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset);
+void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index c4deb7f..b71564e 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,15 +20,15 @@
 #include "router_core_private.h"
 #include <stdio.h>
 
-static void qdrh_add_router       (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_del_router       (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_link         (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_link      (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_next_hop     (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_remove_next_hop  (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_map_destination  (qdr_core_t *core, qdr_action_t *action, bool discard);
-static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_add_router_CT        (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_del_router_CT        (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_link_CT          (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_remove_link_CT       (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_next_hop_CT      (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_remove_next_hop_CT   (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_map_destination_CT   (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
 
 static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
 
@@ -39,7 +39,7 @@ static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS
 
 void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_add_router);
+    qdr_action_t *action = qdr_action(qdrh_add_router_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.address        = qdr_field(address);
     qdr_action_enqueue(core, action);
@@ -48,7 +48,7 @@ void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskb
 
 void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_del_router);
+    qdr_action_t *action = qdr_action(qdrh_del_router_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     qdr_action_enqueue(core, action);
 }
@@ -56,7 +56,7 @@ void qdr_core_del_router(qdr_core_t *core, int router_maskbit)
 
 void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_set_link);
+    qdr_action_t *action = qdr_action(qdrh_set_link_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.link_maskbit   = link_maskbit;
     qdr_action_enqueue(core, action);
@@ -65,7 +65,7 @@ void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit)
 
 void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_remove_link);
+    qdr_action_t *action = qdr_action(qdrh_remove_link_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     qdr_action_enqueue(core, action);
 }
@@ -73,7 +73,7 @@ void qdr_core_remove_link(qdr_core_t *core, int router_maskbit)
 
 void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_set_next_hop);
+    qdr_action_t *action = qdr_action(qdrh_set_next_hop_CT);
     action->args.route_table.router_maskbit    = router_maskbit;
     action->args.route_table.nh_router_maskbit = nh_router_maskbit;
     qdr_action_enqueue(core, action);
@@ -82,7 +82,7 @@ void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_m
 
 void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
 {
-    qdr_action_t *action = qdr_action(qdrh_remove_next_hop);
+    qdr_action_t *action = qdr_action(qdrh_remove_next_hop_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     qdr_action_enqueue(core, action);
 }
@@ -90,7 +90,7 @@ void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit)
 
 void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask_t *routers)
 {
-    qdr_action_t *action = qdr_action(qdrh_set_valid_origins);
+    qdr_action_t *action = qdr_action(qdrh_set_valid_origins_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.router_set     = routers;
     qdr_action_enqueue(core, action);
@@ -99,7 +99,7 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, qd_bitmask
 
 void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase, qd_address_semantics_t sem)
 {
-    qdr_action_t *action = qdr_action(qdrh_map_destination);
+    qdr_action_t *action = qdr_action(qdrh_map_destination_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.address        = qdr_field(address);
     action->args.route_table.address_phase  = phase;
@@ -111,7 +111,7 @@ void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *
 
 void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char aclass, char phase)
 {
-    qdr_action_t *action = qdr_action(qdrh_unmap_destination);
+    qdr_action_t *action = qdr_action(qdrh_unmap_destination_CT);
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.address        = qdr_field(address);
     action->args.route_table.address_phase  = phase;
@@ -136,16 +136,16 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
 // In-Thread Functions
 //==================================================================================
 
-void qdr_route_table_setup(qdr_core_t *core)
+void qdr_route_table_setup_CT(qdr_core_t *core)
 {
     DEQ_INIT(core->addrs);
     DEQ_INIT(core->links);
     DEQ_INIT(core->routers);
     core->addr_hash = qd_hash(10, 32, 0);
 
-    core->router_addr   = qdr_add_local_address(core, "qdrouter",    QD_SEMANTICS_ROUTER_CONTROL);
-    core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
-    core->hello_addr    = qdr_add_local_address(core, "qdhello",     QD_SEMANTICS_ROUTER_CONTROL);
+    core->router_addr   = qdr_add_local_address_CT(core, "qdrouter",    QD_SEMANTICS_ROUTER_CONTROL);
+    core->routerma_addr = qdr_add_local_address_CT(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT);
+    core->hello_addr    = qdr_add_local_address_CT(core, "qdhello",     QD_SEMANTICS_ROUTER_CONTROL);
 
     core->routers_by_mask_bit   = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width());
     core->out_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width());
@@ -161,7 +161,7 @@ void qdr_route_table_setup(qdr_core_t *core)
  * Depending on its policy, the address may be eligible for being closed out
  * (i.e. Logging its terminal statistics and freeing its resources).
  */
-static void qdr_check_addr(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
 {
     if (addr == 0)
         return;
@@ -214,7 +214,7 @@ static void qdr_check_addr(qdr_core_t *core, qdr_address_t *addr, bool was_local
 }
 
 
-static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int          router_maskbit = action->args.route_table.router_maskbit;
     qdr_field_t *address        = action->args.route_table.address;
@@ -294,7 +294,7 @@ static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action, bool discard
 }
 
 
-static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit = action->args.route_table.router_maskbit;
 
@@ -343,7 +343,7 @@ static void qdrh_del_router(qdr_core_t *core, qdr_action_t *action, bool discard
 }
 
 
-static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit = action->args.route_table.router_maskbit;
     int link_maskbit   = action->args.route_table.link_maskbit;
@@ -376,7 +376,7 @@ static void qdrh_set_link(qdr_core_t *core, qdr_action_t *action, bool discard)
 }
 
 
-static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_remove_link_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit = action->args.route_table.router_maskbit;
 
@@ -395,7 +395,7 @@ static void qdrh_remove_link(qdr_core_t *core, qdr_action_t *action, bool discar
 }
 
 
-static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit    = action->args.route_table.router_maskbit;
     int nh_router_maskbit = action->args.route_table.nh_router_maskbit;
@@ -427,7 +427,7 @@ static void qdrh_set_next_hop(qdr_core_t *core, qdr_action_t *action, bool disca
 }
 
 
-static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_remove_next_hop_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int router_maskbit = action->args.route_table.router_maskbit;
 
@@ -441,7 +441,7 @@ static void qdrh_remove_next_hop(qdr_core_t *core, qdr_action_t *action, bool di
 }
 
 
-static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int           router_maskbit = action->args.route_table.router_maskbit;
     qd_bitmask_t *valid_origins  = action->args.route_table.router_set;
@@ -474,7 +474,7 @@ static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action, bool
 }
 
 
-static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     //
     // TODO - handle the class-prefix and phase explicitly
@@ -522,7 +522,7 @@ static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action, bool di
 }
 
 
-static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool discard)
+static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int          router_maskbit = action->args.route_table.router_maskbit;
     qdr_field_t *address        = action->args.route_table.address;
@@ -560,7 +560,7 @@ static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action, bool
         // TODO - If this affects a waypoint, create the proper side effects
         //
 
-        qdr_check_addr(core, addr, false);
+        qdr_check_addr_CT(core, addr, false);
     } while (false);
 
     qdr_field_free(address);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 99edccc..b7f24e3 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -145,7 +145,7 @@ qdr_address_t *qdr_address(qd_address_semantics_t semantics)
 }
 
 
-qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *address, qd_address_semantics_t semantics)
 {
     char                 addr_string[1000];
     qdr_address_t       *addr = 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index de5566b..5dd1d84 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -60,7 +60,7 @@ struct qdr_action_t {
         } route_table;
         struct {
             qdr_query_t *query;
-            int          offset;
+           int          offset;
         } agent;
     } args;
 };
@@ -68,12 +68,17 @@ struct qdr_action_t {
 ALLOC_DECLARE(qdr_action_t);
 DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
 
+#define QDR_AGENT_MAX_COLUMNS 64
+#define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1)
+
 struct qdr_query_t {
     DEQ_LINKS(qdr_query_t);
     qd_router_entity_type_t  entity_type;
     void                    *context;
+    int                      columns[QDR_AGENT_MAX_COLUMNS];
     qd_composed_field_t     *body;
     qdr_field_t             *next_key;
+    int                      next_offset;
     bool                     more;
     const qd_amqp_error_t   *status;
 };
@@ -189,7 +194,7 @@ ALLOC_DECLARE(qdr_address_t);
 DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 
 qdr_address_t *qdr_address(qd_address_semantics_t semantics);
-qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics);
 
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
 void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
@@ -239,9 +244,10 @@ struct qdr_core_t {
 };
 
 void *router_core_thread(void *arg);
-void  qdr_route_table_setup(qdr_core_t *core);
-void  qdr_agent_setup(qdr_core_t *core);
+void  qdr_route_table_setup_CT(qdr_core_t *core);
+void  qdr_agent_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
+void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/61be97fd/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 7ef11ba..4212664 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -36,8 +36,8 @@ void *router_core_thread(void *arg)
     qdr_action_list_t  action_list;
     qdr_action_t      *action;
 
-    qdr_route_table_setup(core);
-    qdr_agent_setup(core);
+    qdr_route_table_setup_CT(core);
+    qdr_agent_setup_CT(core);
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
     while (core->running) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org