You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/07 20:11:58 UTC
svn commit: r1530019 - in /qpid/trunk/qpid/extras/dispatch: CMakeLists.txt
TODO src/dispatch.c src/router_agent.c src/router_node.c src/router_private.h
Author: tross
Date: Mon Oct 7 18:11:57 2013
New Revision: 1530019
URL: http://svn.apache.org/r1530019
Log:
QPID-5212 - Added management-agent access to router state.
Added:
qpid/trunk/qpid/extras/dispatch/src/router_agent.c (with props)
Modified:
qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
qpid/trunk/qpid/extras/dispatch/TODO
qpid/trunk/qpid/extras/dispatch/src/dispatch.c
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/src/router_private.h
Modified: qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/CMakeLists.txt?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/CMakeLists.txt (original)
+++ qpid/trunk/qpid/extras/dispatch/CMakeLists.txt Mon Oct 7 18:11:57 2013
@@ -101,6 +101,7 @@ set(server_SOURCES
src/parse.c
src/posix/threading.c
src/python_embedded.c
+ src/router_agent.c
src/router_node.c
src/router_pynode.c
src/server.c
Modified: qpid/trunk/qpid/extras/dispatch/TODO
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/TODO?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/TODO (original)
+++ qpid/trunk/qpid/extras/dispatch/TODO Mon Oct 7 18:11:57 2013
@@ -39,9 +39,9 @@ enhancements to be fixed by going to the
detection of topology change.
o All PyRouter stimulus through a work queue.
o Router Code Updates
- . Remove all vestiges of "binding"
- . Calculate the valid-origin mask for each path
. Report address mappings to routers
+ . Generate RA immediately after updating routing tables
+ . Generate unsolicited updates for mobile addresses?
o Expose idle-timeout/keepalive on connectors and listeners
- Major Roadmap Features
Modified: qpid/trunk/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/dispatch.c?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/dispatch.c Mon Oct 7 18:11:57 2013
@@ -35,7 +35,7 @@ dx_container_t *dx_container(dx_dispatch
void dx_container_setup_agent(dx_dispatch_t *dx);
void dx_container_free(dx_container_t *container);
dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
-void dx_router_setup_agent(dx_dispatch_t *dx);
+void dx_router_setup_late(dx_dispatch_t *dx);
void dx_router_free(dx_router_t *router);
dx_agent_t *dx_agent(dx_dispatch_t *dx);
void dx_agent_free(dx_agent_t *agent);
@@ -103,7 +103,7 @@ dx_dispatch_t *dx_dispatch(const char *c
dx_alloc_setup_agent(dx);
dx_server_setup_agent(dx);
dx_container_setup_agent(dx);
- dx_router_setup_agent(dx);
+ dx_router_setup_late(dx);
return dx;
}
Added: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1530019&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (added)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Mon Oct 7 18:11:57 2013
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/python_embedded.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/agent.h>
+#include "dispatch_private.h"
+#include "router_private.h"
+
+//static char *module = "router.agent";
+
+#define DX_ROUTER_CLASS_ROUTER 1
+#define DX_ROUTER_CLASS_LINK 2
+#define DX_ROUTER_CLASS_NODE 3
+#define DX_ROUTER_CLASS_ADDRESS 4
+
+typedef struct dx_router_class_t {
+ dx_router_t *router;
+ int class_id;
+} dx_router_class_t;
+
+
+static void dx_router_schema_handler(void *context, void *correlator)
+{
+}
+
+
+static const char *dx_router_addr_text(dx_address_t *addr)
+{
+ if (addr) {
+ const unsigned char *text = hash_key_by_handle(addr->hash_handle);
+ if (text)
+ return (const char*) text;
+ }
+ return 0;
+}
+
+
+static void dx_router_query_router(dx_router_t *router, void *cor)
+{
+ dx_agent_value_string(cor, "area", router->router_area);
+ dx_agent_value_string(cor, "router_id", router->router_id);
+
+ sys_mutex_lock(router->lock);
+ dx_agent_value_uint(cor, "addr_count", DEQ_SIZE(router->addrs));
+ dx_agent_value_uint(cor, "link_count", DEQ_SIZE(router->links));
+ dx_agent_value_uint(cor, "node_count", DEQ_SIZE(router->routers));
+ sys_mutex_unlock(router->lock);
+
+ dx_agent_value_complete(cor, 0);
+}
+
+
+static void dx_router_query_link(dx_router_t *router, void *cor)
+{
+ sys_mutex_lock(router->lock);
+ dx_router_link_t *link = DEQ_HEAD(router->links);
+ const char *link_type = "?";
+ const char *link_dir;
+
+ while (link) {
+ dx_agent_value_uint(cor, "index", link->mask_bit);
+ switch (link->link_type) {
+ case DX_LINK_ENDPOINT: link_type = "endpoint"; break;
+ case DX_LINK_ROUTER: link_type = "inter-router"; break;
+ case DX_LINK_AREA: link_type = "inter-area"; break;
+ }
+ dx_agent_value_string(cor, "link-type", link_type);
+
+ if (link->link_direction == DX_INCOMING)
+ link_dir = "in";
+ else
+ link_dir = "out";
+ dx_agent_value_string(cor, "link-dir", link_dir);
+
+ const char *text = dx_router_addr_text(link->owning_addr);
+ if (text)
+ dx_agent_value_string(cor, "owning-addr", text);
+ else
+ dx_agent_value_null(cor, "owning-addr");
+
+ link = DEQ_NEXT(link);
+ dx_agent_value_complete(cor, link != 0);
+ }
+ sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_node(dx_router_t *router, void *cor)
+{
+ sys_mutex_lock(router->lock);
+ dx_router_node_t *node = DEQ_HEAD(router->routers);
+ while (node) {
+ dx_agent_value_uint(cor, "index", node->mask_bit);
+ dx_agent_value_string(cor, "addr", dx_router_addr_text(node->owning_addr));
+ if (node->next_hop)
+ dx_agent_value_uint(cor, "next-hop", node->next_hop->mask_bit);
+ else
+ dx_agent_value_null(cor, "next-hop");
+ if (node->peer_link)
+ dx_agent_value_uint(cor, "router-link", node->peer_link->mask_bit);
+ else
+ dx_agent_value_null(cor, "router-link");
+ node = DEQ_NEXT(node);
+ dx_agent_value_complete(cor, node != 0);
+ }
+ sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_address(dx_router_t *router, void *cor)
+{
+ sys_mutex_lock(router->lock);
+ dx_address_t *addr = DEQ_HEAD(router->addrs);
+ while (addr) {
+ dx_agent_value_string(cor, "addr", dx_router_addr_text(addr));
+ dx_agent_value_boolean(cor, "in-process", addr->handler != 0);
+ dx_agent_value_uint(cor, "subscriber-count", DEQ_SIZE(addr->rlinks));
+ dx_agent_value_uint(cor, "remote-count", DEQ_SIZE(addr->rnodes));
+ dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
+ dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
+ dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+ addr = DEQ_NEXT(addr);
+ dx_agent_value_complete(cor, addr != 0);
+ }
+ sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_handler(void* context, const char *id, void *correlator)
+{
+ dx_router_class_t *cls = (dx_router_class_t*) context;
+ switch (cls->class_id) {
+ case DX_ROUTER_CLASS_ROUTER: dx_router_query_router(cls->router, correlator); break;
+ case DX_ROUTER_CLASS_LINK: dx_router_query_link(cls->router, correlator); break;
+ case DX_ROUTER_CLASS_NODE: dx_router_query_node(cls->router, correlator); break;
+ case DX_ROUTER_CLASS_ADDRESS: dx_router_query_address(cls->router, correlator); break;
+ }
+}
+
+
+static dx_agent_class_t *dx_router_setup_class(dx_router_t *router, const char *fqname, int id)
+{
+ dx_router_class_t *cls = NEW(dx_router_class_t);
+ cls->router = router;
+ cls->class_id = id;
+
+ return dx_agent_register_class(router->dx, fqname, cls,
+ dx_router_schema_handler,
+ dx_router_query_handler);
+}
+
+
+void dx_router_agent_setup(dx_router_t *router)
+{
+ router->class_router =
+ dx_router_setup_class(router, "org.apache.qpid.dispatch.router", DX_ROUTER_CLASS_ROUTER);
+ router->class_link =
+ dx_router_setup_class(router, "org.apache.qpid.dispatch.router.link", DX_ROUTER_CLASS_LINK);
+ router->class_node =
+ dx_router_setup_class(router, "org.apache.qpid.dispatch.router.node", DX_ROUTER_CLASS_NODE);
+ router->class_address =
+ dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS);
+}
+
Propchange: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Mon Oct 7 18:11:57 2013
@@ -448,6 +448,7 @@ static void router_rx_handler(void* cont
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
+ addr->deliveries_egress++;
}
//
@@ -472,6 +473,7 @@ static void router_rx_handler(void* cont
if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
+ addr->deliveries_egress++;
dx_link_activate(dest_link_ref->link->link);
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
@@ -542,6 +544,7 @@ static void router_rx_handler(void* cont
if (fanout == 1)
re->delivery = delivery;
+ addr->deliveries_transit++;
dx_link_activate(dest_link->link);
}
}
@@ -739,9 +742,8 @@ static int router_outgoing_link_handler(
hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
DEQ_ITEM_INIT(addr);
- addr->handler = 0;
- addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1004,12 +1006,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx
}
-void dx_router_setup_agent(dx_dispatch_t *dx)
+void dx_router_setup_late(dx_dispatch_t *dx)
{
+ dx_router_agent_setup(dx->router);
dx_router_python_setup(dx->router);
dx_timer_schedule(dx->router->timer, 1000);
-
- // TODO
}
@@ -1046,9 +1047,8 @@ dx_address_t *dx_router_register_address
hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
DEQ_ITEM_INIT(addr);
- addr->handler = 0;
- addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1088,6 +1088,7 @@ void dx_router_send(dx_dispatch_t
//
// Forward to all of the local links receiving this address.
//
+ addr->deliveries_ingress++;
dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
@@ -1099,11 +1100,14 @@ void dx_router_send(dx_dispatch_t
DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
dx_link_activate(dest_link_ref->link->link);
+ addr->deliveries_egress++;
+
dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
+ // FIXME - use link-mask to avoid dups.
//
dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
dx_router_link_t *dest_link;
@@ -1121,6 +1125,7 @@ void dx_router_send(dx_dispatch_t
re->disposition = 0;
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
+ addr->deliveries_transit++;
}
dest_node_ref = DEQ_NEXT(dest_node_ref);
}
Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Mon Oct 7 18:11:57 2013
@@ -27,6 +27,7 @@ typedef struct dx_router_conn_t dx_r
void dx_router_python_setup(dx_router_t *router);
void dx_pyrouter_tick(dx_router_t *router);
+void dx_router_agent_setup(dx_router_t *router);
typedef enum {
DX_LINK_ENDPOINT, // A link to a connected endpoint
@@ -109,6 +110,10 @@ struct dx_address_t {
dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
hash_handle_t *hash_handle; // Linkage back to the hash table entry
+
+ uint64_t deliveries_ingress;
+ uint64_t deliveries_egress;
+ uint64_t deliveries_transit;
};
ALLOC_DECLARE(dx_address_t);
@@ -138,6 +143,11 @@ struct dx_router_t {
PyObject *pyRouter;
PyObject *pyTick;
+
+ dx_agent_class_t *class_router;
+ dx_agent_class_t *class_link;
+ dx_agent_class_t *class_node;
+ dx_agent_class_t *class_address;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org