You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/10/07 20:11:58 UTC

svn commit: r1530019 - in /qpid/trunk/qpid/extras/dispatch: CMakeLists.txt TODO src/dispatch.c src/router_agent.c src/router_node.c src/router_private.h

Author: tross
Date: Mon Oct  7 18:11:57 2013
New Revision: 1530019

URL: http://svn.apache.org/r1530019
Log:
QPID-5212 - Added management-agent access to router state.

Added:
    qpid/trunk/qpid/extras/dispatch/src/router_agent.c   (with props)
Modified:
    qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
    qpid/trunk/qpid/extras/dispatch/TODO
    qpid/trunk/qpid/extras/dispatch/src/dispatch.c
    qpid/trunk/qpid/extras/dispatch/src/router_node.c
    qpid/trunk/qpid/extras/dispatch/src/router_private.h

Modified: qpid/trunk/qpid/extras/dispatch/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/CMakeLists.txt?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/CMakeLists.txt (original)
+++ qpid/trunk/qpid/extras/dispatch/CMakeLists.txt Mon Oct  7 18:11:57 2013
@@ -101,6 +101,7 @@ set(server_SOURCES
     src/parse.c
     src/posix/threading.c
     src/python_embedded.c
+    src/router_agent.c
     src/router_node.c
     src/router_pynode.c
     src/server.c

Modified: qpid/trunk/qpid/extras/dispatch/TODO
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/TODO?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/TODO (original)
+++ qpid/trunk/qpid/extras/dispatch/TODO Mon Oct  7 18:11:57 2013
@@ -39,9 +39,9 @@ enhancements to be fixed by going to the
                          detection of topology change.
   o All PyRouter stimulus through a work queue.
   o Router Code Updates
-    . Remove all vestiges of "binding"
-    . Calculate the valid-origin mask for each path
     . Report address mappings to routers
+    . Generate RA immediately after updating routing tables
+    . Generate unsolicited updates for mobile addresses?
   o Expose idle-timeout/keepalive on connectors and listeners
 
 - Major Roadmap Features

Modified: qpid/trunk/qpid/extras/dispatch/src/dispatch.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/dispatch.c?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/dispatch.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/dispatch.c Mon Oct  7 18:11:57 2013
@@ -35,7 +35,7 @@ dx_container_t *dx_container(dx_dispatch
 void            dx_container_setup_agent(dx_dispatch_t *dx);
 void            dx_container_free(dx_container_t *container);
 dx_router_t    *dx_router(dx_dispatch_t *dx, const char *area, const char *id);
-void            dx_router_setup_agent(dx_dispatch_t *dx);
+void            dx_router_setup_late(dx_dispatch_t *dx);
 void            dx_router_free(dx_router_t *router);
 dx_agent_t     *dx_agent(dx_dispatch_t *dx);
 void            dx_agent_free(dx_agent_t *agent);
@@ -103,7 +103,7 @@ dx_dispatch_t *dx_dispatch(const char *c
     dx_alloc_setup_agent(dx);
     dx_server_setup_agent(dx);
     dx_container_setup_agent(dx);
-    dx_router_setup_agent(dx);
+    dx_router_setup_late(dx);
 
     return dx;
 }

Added: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_agent.c?rev=1530019&view=auto
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_agent.c (added)
+++ qpid/trunk/qpid/extras/dispatch/src/router_agent.c Mon Oct  7 18:11:57 2013
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/python_embedded.h>
+#include <stdio.h>
+#include <string.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/agent.h>
+#include "dispatch_private.h"
+#include "router_private.h"
+
+//static char *module = "router.agent";
+
+#define DX_ROUTER_CLASS_ROUTER  1
+#define DX_ROUTER_CLASS_LINK    2
+#define DX_ROUTER_CLASS_NODE    3
+#define DX_ROUTER_CLASS_ADDRESS 4
+
+typedef struct dx_router_class_t {
+    dx_router_t *router;
+    int          class_id;
+} dx_router_class_t;
+
+
+static void dx_router_schema_handler(void *context, void *correlator)
+{
+}
+
+
+static const char *dx_router_addr_text(dx_address_t *addr)
+{
+    if (addr) {
+        const unsigned char *text = hash_key_by_handle(addr->hash_handle);
+        if (text)
+            return (const char*) text;
+    }
+    return 0;
+}
+
+
+static void dx_router_query_router(dx_router_t *router, void *cor)
+{
+    dx_agent_value_string(cor, "area",      router->router_area);
+    dx_agent_value_string(cor, "router_id", router->router_id);
+
+    sys_mutex_lock(router->lock);
+    dx_agent_value_uint(cor, "addr_count", DEQ_SIZE(router->addrs));
+    dx_agent_value_uint(cor, "link_count", DEQ_SIZE(router->links));
+    dx_agent_value_uint(cor, "node_count", DEQ_SIZE(router->routers));
+    sys_mutex_unlock(router->lock);
+
+    dx_agent_value_complete(cor, 0);
+}
+
+
+static void dx_router_query_link(dx_router_t *router, void *cor)
+{
+    sys_mutex_lock(router->lock);
+    dx_router_link_t *link = DEQ_HEAD(router->links);
+    const char       *link_type = "?";
+    const char       *link_dir;
+
+    while (link) {
+        dx_agent_value_uint(cor, "index", link->mask_bit);
+        switch (link->link_type) {
+        case DX_LINK_ENDPOINT: link_type = "endpoint";     break;
+        case DX_LINK_ROUTER:   link_type = "inter-router"; break;
+        case DX_LINK_AREA:     link_type = "inter-area";   break;
+        }
+        dx_agent_value_string(cor, "link-type", link_type);
+
+        if (link->link_direction == DX_INCOMING)
+            link_dir = "in";
+        else
+            link_dir = "out";
+        dx_agent_value_string(cor, "link-dir", link_dir);
+
+        const char *text = dx_router_addr_text(link->owning_addr);
+        if (text)
+            dx_agent_value_string(cor, "owning-addr", text);
+        else
+            dx_agent_value_null(cor, "owning-addr");
+
+        link = DEQ_NEXT(link);
+        dx_agent_value_complete(cor, link != 0);
+    }
+    sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_node(dx_router_t *router, void *cor)
+{
+    sys_mutex_lock(router->lock);
+    dx_router_node_t *node = DEQ_HEAD(router->routers);
+    while (node) {
+        dx_agent_value_uint(cor, "index", node->mask_bit);
+        dx_agent_value_string(cor, "addr", dx_router_addr_text(node->owning_addr));
+        if (node->next_hop)
+            dx_agent_value_uint(cor, "next-hop", node->next_hop->mask_bit);
+        else
+            dx_agent_value_null(cor, "next-hop");
+        if (node->peer_link)
+            dx_agent_value_uint(cor, "router-link", node->peer_link->mask_bit);
+        else
+            dx_agent_value_null(cor, "router-link");
+        node = DEQ_NEXT(node);
+        dx_agent_value_complete(cor, node != 0);
+    }
+    sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_address(dx_router_t *router, void *cor)
+{
+    sys_mutex_lock(router->lock);
+    dx_address_t *addr = DEQ_HEAD(router->addrs);
+    while (addr) {
+        dx_agent_value_string(cor, "addr", dx_router_addr_text(addr));
+        dx_agent_value_boolean(cor, "in-process", addr->handler != 0);
+        dx_agent_value_uint(cor, "subscriber-count", DEQ_SIZE(addr->rlinks));
+        dx_agent_value_uint(cor, "remote-count", DEQ_SIZE(addr->rnodes));
+        dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
+        dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
+        dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+        addr = DEQ_NEXT(addr);
+        dx_agent_value_complete(cor, addr != 0);
+    }
+    sys_mutex_unlock(router->lock);
+}
+
+
+static void dx_router_query_handler(void* context, const char *id, void *correlator)
+{
+    dx_router_class_t *cls = (dx_router_class_t*) context;
+    switch (cls->class_id) {
+    case DX_ROUTER_CLASS_ROUTER:  dx_router_query_router(cls->router, correlator); break;
+    case DX_ROUTER_CLASS_LINK:    dx_router_query_link(cls->router, correlator); break;
+    case DX_ROUTER_CLASS_NODE:    dx_router_query_node(cls->router, correlator); break;
+    case DX_ROUTER_CLASS_ADDRESS: dx_router_query_address(cls->router, correlator); break;
+    }
+}
+
+
+static dx_agent_class_t *dx_router_setup_class(dx_router_t *router, const char *fqname, int id)
+{
+    dx_router_class_t *cls = NEW(dx_router_class_t);
+    cls->router   = router;
+    cls->class_id = id;
+
+    return dx_agent_register_class(router->dx, fqname, cls,
+                                   dx_router_schema_handler,
+                                   dx_router_query_handler);
+}
+
+
+void dx_router_agent_setup(dx_router_t *router)
+{
+    router->class_router =
+        dx_router_setup_class(router, "org.apache.qpid.dispatch.router", DX_ROUTER_CLASS_ROUTER);
+    router->class_link =
+        dx_router_setup_class(router, "org.apache.qpid.dispatch.router.link", DX_ROUTER_CLASS_LINK);
+    router->class_node =
+        dx_router_setup_class(router, "org.apache.qpid.dispatch.router.node", DX_ROUTER_CLASS_NODE);
+    router->class_address =
+        dx_router_setup_class(router, "org.apache.qpid.dispatch.router.address", DX_ROUTER_CLASS_ADDRESS);
+}
+

Propchange: qpid/trunk/qpid/extras/dispatch/src/router_agent.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Mon Oct  7 18:11:57 2013
@@ -448,6 +448,7 @@ static void router_rx_handler(void* cont
                     in_process_copy = dx_message_copy(msg);
                     handler         = addr->handler;
                     handler_context = addr->handler_context;
+                    addr->deliveries_egress++;
                 }
 
                 //
@@ -472,6 +473,7 @@ static void router_rx_handler(void* cont
                         if (fanout == 1 && !dx_delivery_settled(delivery))
                             re->delivery = delivery;
 
+                        addr->deliveries_egress++;
                         dx_link_activate(dest_link_ref->link->link);
                         dest_link_ref = DEQ_NEXT(dest_link_ref);
                     }
@@ -542,6 +544,7 @@ static void router_rx_handler(void* cont
                                     if (fanout == 1)
                                         re->delivery = delivery;
                                 
+                                    addr->deliveries_transit++;
                                     dx_link_activate(dest_link->link);
                                 }
                             }
@@ -739,9 +742,8 @@ static int router_outgoing_link_handler(
         hash_retrieve(router->addr_hash, iter, (void**) &addr);
         if (!addr) {
             addr = new_dx_address_t();
+            memset(addr, 0, sizeof(dx_address_t));
             DEQ_ITEM_INIT(addr);
-            addr->handler         = 0;
-            addr->handler_context = 0;
             DEQ_INIT(addr->rlinks);
             DEQ_INIT(addr->rnodes);
             hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1004,12 +1006,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx
 }
 
 
-void dx_router_setup_agent(dx_dispatch_t *dx)
+void dx_router_setup_late(dx_dispatch_t *dx)
 {
+    dx_router_agent_setup(dx->router);
     dx_router_python_setup(dx->router);
     dx_timer_schedule(dx->router->timer, 1000);
-
-    // TODO
 }
 
 
@@ -1046,9 +1047,8 @@ dx_address_t *dx_router_register_address
     hash_retrieve(router->addr_hash, iter, (void**) &addr);
     if (!addr) {
         addr = new_dx_address_t();
+        memset(addr, 0, sizeof(dx_address_t));
         DEQ_ITEM_INIT(addr);
-        addr->handler         = 0;
-        addr->handler_context = 0;
         DEQ_INIT(addr->rlinks);
         DEQ_INIT(addr->rnodes);
         hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
@@ -1088,6 +1088,7 @@ void dx_router_send(dx_dispatch_t       
         //
         // Forward to all of the local links receiving this address.
         //
+        addr->deliveries_ingress++;
         dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
         while (dest_link_ref) {
             dx_routed_event_t *re = new_dx_routed_event_t();
@@ -1099,11 +1100,14 @@ void dx_router_send(dx_dispatch_t       
             DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
 
             dx_link_activate(dest_link_ref->link->link);
+            addr->deliveries_egress++;
+
             dest_link_ref = DEQ_NEXT(dest_link_ref);
         }
 
         //
         // Forward to the next-hops for remote destinations.
+        // FIXME - use link-mask to avoid dups.
         //
         dx_router_ref_t  *dest_node_ref = DEQ_HEAD(addr->rnodes);
         dx_router_link_t *dest_link;
@@ -1121,6 +1125,7 @@ void dx_router_send(dx_dispatch_t       
                 re->disposition = 0;
                 DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
                 dx_link_activate(dest_link->link);
+                addr->deliveries_transit++;
             }
             dest_node_ref = DEQ_NEXT(dest_node_ref);
         }

Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1530019&r1=1530018&r2=1530019&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Mon Oct  7 18:11:57 2013
@@ -27,6 +27,7 @@ typedef struct dx_router_conn_t     dx_r
 
 void dx_router_python_setup(dx_router_t *router);
 void dx_pyrouter_tick(dx_router_t *router);
+void dx_router_agent_setup(dx_router_t *router);
 
 typedef enum {
     DX_LINK_ENDPOINT,   // A link to a connected endpoint
@@ -109,6 +110,10 @@ struct dx_address_t {
     dx_router_link_ref_list_t  rlinks;           // Locally-Connected Consumers
     dx_router_ref_list_t       rnodes;           // Remotely-Connected Consumers
     hash_handle_t             *hash_handle;      // Linkage back to the hash table entry
+
+    uint64_t deliveries_ingress;
+    uint64_t deliveries_egress;
+    uint64_t deliveries_transit;
 };
 
 ALLOC_DECLARE(dx_address_t);
@@ -138,6 +143,11 @@ struct dx_router_t {
 
     PyObject               *pyRouter;
     PyObject               *pyTick;
+
+    dx_agent_class_t       *class_router;
+    dx_agent_class_t       *class_link;
+    dx_agent_class_t       *class_node;
+    dx_agent_class_t       *class_address;
 };
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org