You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/16 22:51:05 UTC

[2/3] qpid-dispatch git commit: DISPATCH-179 - WIP checkpoint. Ripped out large sections of the old architecture. Added the general callback functionality for non-connection-specific actions.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 77ac601..3cdc98f 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -28,8 +28,6 @@
 #include "waypoint_private.h"
 #include "entity_cache.h"
 
-static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
-
 static qd_log_source_t *log_source = 0;
 static PyObject        *pyRouter   = 0;
 static PyObject        *pyTick     = 0;
@@ -49,84 +47,12 @@ static PyObject *qd_add_router(PyObject *self, PyObject *args)
     qd_router_t   *router  = adapter->router;
     const char    *address;
     int            router_maskbit;
-    char          *error = 0;
 
     if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
         return 0;
 
     qdr_core_add_router(router->router_core, address, router_maskbit);
 
-    do {
-        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
-            error = "Router bit mask out of range";
-            break;
-        }
-
-        sys_mutex_lock(router->lock);
-        if (router->routers_by_mask_bit[router_maskbit] != 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Adding router over already existing router";
-            break;
-        }
-
-        //
-        // Hash lookup the address to ensure there isn't an existing router address.
-        //
-        qd_field_iterator_t *iter = qd_address_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
-        qd_address_t        *addr;
-
-        qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
-        assert(addr == 0);
-
-        //
-        // Create an address record for this router and insert it in the hash table.
-        // This record will be found whenever a "foreign" topological address to this
-        // remote router is looked up.
-        //
-        addr = qd_address(router_addr_semantics);
-        qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
-        DEQ_INSERT_TAIL(router->addrs, addr);
-        qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
-        qd_field_iterator_free(iter);
-
-        //
-        // Create a router-node record to represent the remote router.
-        //
-        qd_router_node_t *rnode = new_qd_router_node_t();
-        DEQ_ITEM_INIT(rnode);
-        rnode->owning_addr   = addr;
-        rnode->mask_bit      = router_maskbit;
-        rnode->next_hop      = 0;
-        rnode->peer_link     = 0;
-        rnode->ref_count     = 0;
-        rnode->valid_origins = qd_bitmask(0);
-
-        DEQ_INSERT_TAIL(router->routers, rnode);
-
-        //
-        // Link the router record to the address record.
-        //
-        qd_router_add_node_ref_LH(&addr->rnodes, rnode);
-
-        //
-        // Link the router record to the router address records.
-        //
-        qd_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
-        qd_router_add_node_ref_LH(&router->routerma_addr->rnodes, rnode);
-
-        //
-        // Add the router record to the mask-bit index.
-        //
-        router->routers_by_mask_bit[router_maskbit] = rnode;
-
-        sys_mutex_unlock(router->lock);
-    } while (0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -137,69 +63,12 @@ static PyObject* qd_del_router(PyObject *self, PyObject *args)
     RouterAdapter *adapter = (RouterAdapter*) self;
     qd_router_t   *router  = adapter->router;
     int router_maskbit;
-    char *error = 0;
 
     if (!PyArg_ParseTuple(args, "i", &router_maskbit))
         return 0;
 
     qdr_core_del_router(router->router_core, router_maskbit);
 
-    do {
-        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
-            error = "Router bit mask out of range";
-            break;
-        }
-
-        sys_mutex_lock(router->lock);
-        if (router->routers_by_mask_bit[router_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Deleting nonexistent router";
-            break;
-        }
-
-        qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
-        qd_address_t     *oaddr = rnode->owning_addr;
-        assert(oaddr);
-
-        qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, oaddr);
-
-        //
-        // Unlink the router node from the address record
-        //
-        qd_router_del_node_ref_LH(&oaddr->rnodes, rnode);
-
-        //
-        // While the router node has a non-zero reference count, look for addresses
-        // to unlink the node from.
-        //
-        qd_address_t *addr = DEQ_HEAD(router->addrs);
-        while (addr && rnode->ref_count > 0) {
-            qd_router_del_node_ref_LH(&addr->rnodes, rnode);
-            addr = DEQ_NEXT(addr);
-        }
-        assert(rnode->ref_count == 0);
-
-        //
-        // Free the router node and the owning address records.
-        //
-        qd_bitmask_free(rnode->valid_origins);
-        DEQ_REMOVE(router->routers, rnode);
-        free_qd_router_node_t(rnode);
-
-        qd_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
-        DEQ_REMOVE(router->addrs, oaddr);
-        qd_hash_handle_free(oaddr->hash_handle);
-        router->routers_by_mask_bit[router_maskbit] = 0;
-        free_qd_address_t(oaddr);
-
-        sys_mutex_unlock(router->lock);
-    } while(0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -211,40 +80,12 @@ static PyObject* qd_set_link(PyObject *self, PyObject *args)
     qd_router_t   *router  = adapter->router;
     int            router_maskbit;
     int            link_maskbit;
-    char          *error = 0;
 
     if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &link_maskbit))
         return 0;
 
     qdr_core_set_link(router->router_core, router_maskbit, link_maskbit);
 
-    do {
-        if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
-            error = "Link bit mask out of range";
-            break;
-        }
-
-        sys_mutex_lock(router->lock);
-        if (router->out_links_by_mask_bit[link_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Adding neighbor router with invalid link reference";
-            break;
-        }
-
-        //
-        // Add the peer_link reference to the router record.
-        //
-        qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
-        rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
-
-        sys_mutex_unlock(router->lock);
-    } while (0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -255,25 +96,12 @@ static PyObject* qd_remove_link(PyObject *self, PyObject *args)
     RouterAdapter *adapter = (RouterAdapter*) self;
     qd_router_t   *router  = adapter->router;
     int            router_maskbit;
-    char          *error = 0;
 
     if (!PyArg_ParseTuple(args, "i", &router_maskbit))
         return 0;
 
     qdr_core_remove_link(router->router_core, router_maskbit);
 
-    do {
-        sys_mutex_lock(router->lock);
-        qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
-        rnode->peer_link = 0;
-        sys_mutex_unlock(router->lock);
-    } while (0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -285,49 +113,12 @@ static PyObject* qd_set_next_hop(PyObject *self, PyObject *args)
     qd_router_t   *router  = adapter->router;
     int            router_maskbit;
     int            next_hop_maskbit;
-    char          *error = 0;
 
     if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
         return 0;
 
     qdr_core_set_next_hop(router->router_core, router_maskbit, next_hop_maskbit);
 
-    do {
-        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
-            error = "Router bit mask out of range";
-            break;
-        }
-
-        if (next_hop_maskbit >= qd_bitmask_width() || next_hop_maskbit < 0) {
-            error = "Next Hop bit mask out of range";
-            break;
-        }
-
-        sys_mutex_lock(router->lock);
-        if (router->routers_by_mask_bit[router_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Router Not Found";
-            break;
-        }
-
-        if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Next Hop Not Found";
-            break;
-        }
-
-        if (router_maskbit != next_hop_maskbit) {
-            qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
-            rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
-        }
-        sys_mutex_unlock(router->lock);
-    } while (0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -338,37 +129,12 @@ static PyObject* qd_remove_next_hop(PyObject *self, PyObject *args)
     RouterAdapter *adapter = (RouterAdapter*) self;
     qd_router_t   *router  = adapter->router;
     int            router_maskbit;
-    char          *error = 0;
 
     if (!PyArg_ParseTuple(args, "i", &router_maskbit))
         return 0;
 
     qdr_core_remove_next_hop(router->router_core, router_maskbit);
 
-    do {
-        if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
-            error = "Router bit mask out of range";
-            break;
-        }
-
-        sys_mutex_lock(router->lock);
-        if (router->routers_by_mask_bit[router_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Router Not Found";
-            break;
-        }
-
-        qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
-        rnode->next_hop = 0;
-
-        sys_mutex_unlock(router->lock);
-    } while (0);
-
-    if (error) {
-        PyErr_SetString(PyExc_Exception, error);
-        return 0;
-    }
-
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -397,43 +163,26 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
             break;
         }
 
-        sys_mutex_lock(router->lock);
-        if (router->routers_by_mask_bit[router_maskbit] == 0) {
-            sys_mutex_unlock(router->lock);
-            error = "Router Not Found";
-            break;
-        }
-
-        Py_ssize_t        origin_count = PyList_Size(origin_list);
-        qd_router_node_t *rnode        = router->routers_by_mask_bit[router_maskbit];
-        qd_bitmask_t     *core_bitmask = qd_bitmask(0);
-        int               maskbit;
+        Py_ssize_t    origin_count = PyList_Size(origin_list);
+        qd_bitmask_t *core_bitmask = qd_bitmask(0);
+        int           maskbit;
 
         for (idx = 0; idx < origin_count; idx++) {
             maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
-
             if (maskbit >= qd_bitmask_width() || maskbit < 0) {
                 error = "Origin bit mask out of range";
                 break;
             }
-
-            if (router->routers_by_mask_bit[maskbit] == 0) {
-                error = "Origin router Not Found";
-                break;
-            }
         }
 
         if (error == 0) {
-            qd_bitmask_clear_all(rnode->valid_origins);
-            qd_bitmask_set_bit(rnode->valid_origins, 0);  // This router is a valid origin for all destinations
+            qd_bitmask_set_bit(core_bitmask, 0);  // This router is a valid origin for all destinations
             for (idx = 0; idx < origin_count; idx++) {
                 maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
-                qd_bitmask_set_bit(rnode->valid_origins, maskbit);
                 qd_bitmask_set_bit(core_bitmask, maskbit);
             }
-        }
-
-        sys_mutex_unlock(router->lock);
+        } else
+            qd_bitmask_free(core_bitmask);
 
         qdr_core_set_valid_origins(router->router_core, router_maskbit, core_bitmask);
     } while (0);
@@ -450,16 +199,12 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
 
 static PyObject* qd_map_destination(PyObject *self, PyObject *args)
 {
-    RouterAdapter       *adapter = (RouterAdapter*) self;
-    qd_router_t         *router  = adapter->router;
-    char                 phase;
-    char                 unused;
-    const char          *addr_string;
-    int                  maskbit;
-    qd_address_t        *addr;
-    qd_field_iterator_t *iter;
-
-    if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
+    RouterAdapter *adapter = (RouterAdapter*) self;
+    qd_router_t   *router  = adapter->router;
+    const char    *addr_string;
+    int            maskbit;
+
+    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
         return 0;
 
     if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -467,34 +212,8 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args)
         return 0;
     }
 
-    if (router->routers_by_mask_bit[maskbit] == 0) {
-        PyErr_SetString(PyExc_Exception, "Router Not Found");
-        return 0;
-    }
-
-    iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
-    sys_mutex_lock(router->lock);
-    qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
-    if (!addr) {
-        addr = qd_address(router_semantics_for_addr(router, iter, phase, &unused));
-        qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
-        DEQ_ITEM_INIT(addr);
-        DEQ_INSERT_TAIL(router->addrs, addr);
-        qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
-    }
-    qd_field_iterator_free(iter);
-
-    qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
-    qd_router_add_node_ref_LH(&addr->rnodes, rnode);
+    qdr_core_map_destination(router->router_core, maskbit, addr_string);
 
-    //
-    // If the address has an associated waypoint, notify the waypoint module of the changes.
-    //
-    if (addr->waypoint)
-        qd_waypoint_address_updated_LH(router->qd, addr);
-
-    sys_mutex_unlock(router->lock);
     Py_INCREF(Py_None);
     return Py_None;
 }
@@ -506,7 +225,6 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
     qd_router_t   *router  = adapter->router;
     const char    *addr_string;
     int            maskbit;
-    qd_address_t  *addr;
 
     if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
         return 0;
@@ -516,35 +234,7 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
         return 0;
     }
 
-    if (router->routers_by_mask_bit[maskbit] == 0) {
-        PyErr_SetString(PyExc_Exception, "Router Not Found");
-        return 0;
-    }
-
-    qd_router_node_t    *rnode = router->routers_by_mask_bit[maskbit];
-    qd_field_iterator_t *iter  = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
-    sys_mutex_lock(router->lock);
-    qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
-    qd_field_iterator_free(iter);
-
-    if (!addr) {
-        PyErr_SetString(PyExc_Exception, "Address Not Found");
-        sys_mutex_unlock(router->lock);
-        return 0;
-    }
-
-    qd_router_del_node_ref_LH(&addr->rnodes, rnode);
-
-    //
-    // If the address has an associated waypoint, notify the waypoint module of the changes.
-    //
-    if (addr->waypoint)
-        qd_waypoint_address_updated_LH(router->qd, addr);
-
-    sys_mutex_unlock(router->lock);
-
-    qd_router_check_addr(router, addr, 0);
+    qdr_core_unmap_destination(router->router_core, maskbit, addr_string);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -625,11 +315,75 @@ static PyTypeObject RouterAdapterType = {
     0                          /* tp_version_tag */
 };
 
+
+static void qd_router_mobile_added(void *context, const char *address_hash)
+{
+    qd_router_t *router = (qd_router_t*) context;
+    PyObject    *pArgs;
+    PyObject    *pValue;
+
+    if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+        qd_python_lock_state_t lock_state = qd_python_lock();
+        pArgs = PyTuple_New(1);
+        PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+        pValue = PyObject_CallObject(pyAdded, pArgs);
+        qd_error_py();
+        Py_DECREF(pArgs);
+        Py_XDECREF(pValue);
+        qd_python_unlock(lock_state);
+    }
+}
+
+
+static void qd_router_mobile_removed(void *context, const char *address_hash)
+{
+    qd_router_t *router = (qd_router_t*) context;
+    PyObject    *pArgs;
+    PyObject    *pValue;
+
+    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+        qd_python_lock_state_t lock_state = qd_python_lock();
+        pArgs = PyTuple_New(1);
+        PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+        pValue = PyObject_CallObject(pyRemoved, pArgs);
+        qd_error_py();
+        Py_DECREF(pArgs);
+        Py_XDECREF(pValue);
+        qd_python_unlock(lock_state);
+    }
+}
+
+
+static void qd_router_link_lost(void *context, int link_mask_bit)
+{
+    qd_router_t *router = (qd_router_t*) context;
+    PyObject    *pArgs;
+    PyObject    *pValue;
+
+    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+        qd_python_lock_state_t lock_state = qd_python_lock();
+        pArgs = PyTuple_New(1);
+        PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
+        pValue = PyObject_CallObject(pyLinkLost, pArgs);
+        qd_error_py();
+        Py_DECREF(pArgs);
+        Py_XDECREF(pValue);
+        qd_python_unlock(lock_state);
+    }
+}
+
+
 qd_error_t qd_router_python_setup(qd_router_t *router)
 {
     qd_error_clear();
     log_source = qd_log_source("ROUTER");
 
+    qdr_core_route_table_handlers(router->router_core,
+                                  router,
+                                  qd_router_mobile_added,
+                                  qd_router_mobile_removed,
+                                  qd_router_link_lost);
+
     //
     // If we are not operating as an interior router, don't start the
     // router module.
@@ -726,62 +480,3 @@ qd_error_t qd_pyrouter_tick(qd_router_t *router)
     return err;
 }
 
-
-void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter)
-{
-    PyObject *pArgs;
-    PyObject *pValue;
-
-    if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-        char *address = (char*) qd_field_iterator_copy(iter);
-
-        qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(1);
-        PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
-        pValue = PyObject_CallObject(pyAdded, pArgs);
-        qd_error_py();
-        Py_DECREF(pArgs);
-        Py_XDECREF(pValue);
-        qd_python_unlock(lock_state);
-
-        free(address);
-    }
-}
-
-
-void qd_router_mobile_removed(qd_router_t *router, const char *address)
-{
-    PyObject *pArgs;
-    PyObject *pValue;
-
-    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(1);
-        PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
-        pValue = PyObject_CallObject(pyRemoved, pArgs);
-        qd_error_py();
-        Py_DECREF(pArgs);
-        Py_XDECREF(pValue);
-        qd_python_unlock(lock_state);
-    }
-}
-
-
-void qd_router_link_lost(qd_router_t *router, int link_mask_bit)
-{
-    PyObject *pArgs;
-    PyObject *pValue;
-
-    if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(1);
-        PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
-        pValue = PyObject_CallObject(pyLinkLost, pArgs);
-        qd_error_py();
-        Py_DECREF(pArgs);
-        Py_XDECREF(pValue);
-        qd_python_unlock(lock_state);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/waypoint.c
----------------------------------------------------------------------
diff --git a/src/waypoint.c b/src/waypoint.c
index dcf00b3..fb97311 100644
--- a/src/waypoint.c
+++ b/src/waypoint.c
@@ -95,7 +95,7 @@ static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
         if (DEQ_SIZE(addr->rlinks) == 1) {
             qd_field_iterator_t *iter = qd_address_iterator_string(wp->address, ITER_VIEW_ADDRESS_HASH);
             qd_address_iterator_set_phase(iter, wp->in_phase);
-            qd_router_mobile_added(router, iter);
+            //qd_router_mobile_added(router, iter);
             qd_field_iterator_free(iter);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org