You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/07 22:59:59 UTC

qpid-dispatch git commit: DISPATCH-179 - WIP, added core framework and began work on the core API

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 [created] ea2d90b6f


DISPATCH-179 - WIP, added core framework and began work on the core API


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ea2d90b6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ea2d90b6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ea2d90b6

Branch: refs/heads/tross-DISPATCH-179-1
Commit: ea2d90b6f87d2f11d4160babcd8ee0d17e49a0bb
Parents: 63e8a6b
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 7 16:58:50 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 7 16:58:50 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/ctools.h                |   1 +
 include/qpid/dispatch/router_core.h           | 147 ++++++++++++++++++++
 python/qpid_dispatch/management/qdrouter.json |   1 +
 src/CMakeLists.txt                            |   1 +
 src/router_core.c                             | 151 +++++++++++++++++++++
 src/router_node.c                             |  26 ++--
 src/router_private.h                          |   7 +-
 7 files changed, 312 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/include/qpid/dispatch/ctools.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/ctools.h b/include/qpid/dispatch/ctools.h
index 698c5b9..3ac5998 100644
--- a/include/qpid/dispatch/ctools.h
+++ b/include/qpid/dispatch/ctools.h
@@ -52,6 +52,7 @@
 #define DEQ_SIZE(d) ((d).size)
 #define DEQ_NEXT(i) (i)->next
 #define DEQ_PREV(i) (i)->prev
+#define DEQ_MOVE(d1,d2) do {d2 = d1; DEQ_INIT(d1);} while (0)
 /**
  *@pre ptr points to first element of deq
  *@post ptr points to first element of deq that passes test, or 0. Test should involve ptr.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
new file mode 100644
index 0000000..b2e4aac
--- /dev/null
+++ b/include/qpid/dispatch/router_core.h
@@ -0,0 +1,147 @@
+#ifndef __router_core_h__
+#define __router_core_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/bitmask.h>
+
+typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_connection_t qdr_connection_t;
+typedef struct qdr_link_t qdr_link_t;
+typedef struct qdr_delivery_t qdr_delivery_t;
+
+/**
+ * Allocate and start an instance of the router core module.
+ */
+qdr_core_t *qdr_core(void);
+
+/**
+ * Stop and deallocate an instance of the router core.
+ */
+void qdr_core_free(qdr_core_t *core);
+
+/**
+ ******************************************************************************
+ * Route table maintenance functions
+ ******************************************************************************
+ */
+typedef enum {
+    QD_WAYPOINT_SOURCE,
+    QD_WAYPOINT_SINK,
+    QD_WAYPOINT_THROUGH,
+    QD_WAYPOINT_BYPASS,
+    QD_WAYPOINT_TAP
+} qd_waypoint_style_t;
+
+void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit);
+void qdr_core_del_router(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit);
+void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
+void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_valid_origins(qdr_core_t *core, const qd_bitmask_t *routers);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char phase);
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char phase);
+
+void qdr_core_add_link_route(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *prefix);
+void qdr_core_del_link_route(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *prefix);
+
+void qdr_core_add_waypoint(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *address, qd_waypoint_style_t style, char in_phase, char out_phase);
+
+//
+// The following callbacks shall be invoked on a connection thread from the server thread pool.
+//
+typedef void (*qdr_mobile_added_t)        (void *context, qd_field_iterator_t *address);
+typedef void (*qdr_mobile_removed_t)      (void *context, qd_field_iterator_t *address);
+typedef void (*qdr_link_lost_t)           (void *context, int link_maskbit);
+typedef void (*qdr_connection_activate_t) (void *context, const qdr_connection_t *connection);
+typedef void (*qdr_receive_t)             (void *context, qd_message_t *msg, int link_maskbit);
+
+void qdr_core_route_table_handlers(void                      *context,
+                                   qdr_mobile_added_t         mobile_added,
+                                   qdr_mobile_removed_t       mobile_removed,
+                                   qdr_link_lost_t            link_lost,
+                                   qdr_connection_activate_t  connection_activate);
+
+void qdr_core_subscribe(qdr_core_t *core, const char *address, bool local, bool mobile, qdr_receive_t on_message, void *context);
+
+
+/**
+ ******************************************************************************
+ * Connection functions
+ ******************************************************************************
+ */
+typedef enum {
+    QDR_WORK_FIRST_ATTACH,  // Core is initiating a first-attach
+    QDR_WORK_SECOND_ATTACH, // Core is sending a second-attach
+    QDR_WORK_DETACH,        // Core is sending a detach
+    QDR_WORK_DELIVERY       // Core is updating a delivery for in-thread processing
+} qdr_work_type_t;
+
+typedef struct {
+    qdr_work_type_t  work_type;
+    pn_terminus_t   *source;   // For FIRST_ATTACH
+    pn_terminus_t   *target;   // For FIRST_ATTACH
+    qdr_link_t      *link;     // For SECOND_ATTACH, DETACH
+    qdr_delivery_t  *delivery; // For DELIVERY
+} qdr_work_t;
+
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, qd_field_iterator_t *label);
+void qdr_connection_closed(qdr_connection_t *conn);
+void qdr_connection_set_context(qdr_connection_t *conn, void *context);
+void *qdr_connection_get_context(qdr_connection_t *conn);
+qdr_work_t *qdr_connection_work(qdr_connection_t *conn);
+
+/**
+ ******************************************************************************
+ * Link functions
+ ******************************************************************************
+ */
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, pn_terminus_t *source, pn_terminus_t *target);
+void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source, pn_terminus_t *target);
+void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition);
+
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
+
+
+/**
+ ******************************************************************************
+ * Delivery functions
+ ******************************************************************************
+ */
+void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
+void qdr_delivery_update_flow(qdr_delivery_t *delivery);
+void qdr_delivery_process(qdr_delivery_t *delivery);
+
+/**
+ ******************************************************************************
+ * Management instrumentation functions
+ ******************************************************************************
+ */
+typedef enum {
+    QD_ROUTER_CONNECTION,
+    QD_ROUTER_LINK,
+    QD_ROUTER_ADDRESS
+} qd_router_entity_type_t;
+
+void qdr_core_query(qdr_core_t *core, qd_router_entity_type_t type, const char *filter, void *context);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 3103224..e447f19 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -690,6 +690,7 @@
                 "module": {
                     "type":[
                         "ROUTER",
+                        "ROUTER_CORE",
                         "ROUTER_HELLO",
                         "ROUTER_LS",
                         "ROUTER_MA",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 63ccd60..4b3bd07 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -65,6 +65,7 @@ set(qpid_dispatch_SOURCES
   python_embedded.c
   router_agent.c
   router_config.c
+  router_core.c
   router_delivery.c
   router_node.c
   router_forwarders.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core.c b/src/router_core.c
new file mode 100644
index 0000000..fd5e22d
--- /dev/null
+++ b/src/router_core.c
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dispatch_private.h"
+#include <qpid/dispatch/router_core.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+#include <memory.h>
+
+/**
+ * Creates a thread that is dedicated to managing and using the routing table.
+ * The purpose of moving this function into one thread is to remove the widespread
+ * lock contention that happens with synchrounous multi-threaded routing.
+ *
+ * This module owns, manages, and uses the router-link list and the address hash table
+ */
+
+
+/**
+ * The following data structures are private to the core module.  They are defined
+ * here to ensure their invisibiliy outside the core.
+ */
+
+typedef struct qdr_core_work_t {
+    DEQ_LINKS(struct qdr_core_work_t);
+} qdr_core_work_t;
+
+ALLOC_DECLARE(qdr_core_work_t);
+ALLOC_DEFINE(qdr_core_work_t);
+DEQ_DECLARE(qdr_core_work_t, qdr_core_work_list_t);
+
+struct qdr_core_t {
+    qd_log_source_t      *log;
+    sys_cond_t           *cond;
+    sys_mutex_t          *lock;
+    sys_thread_t         *thread;
+    bool                  running;
+    qdr_core_work_list_t  work_list;
+};
+
+
+static void router_core_do_work(qdr_core_t *core, qdr_core_work_t *work)
+{
+}
+
+
+static void *router_core_thread(void *arg)
+{
+    qdr_core_t           *core = (qdr_core_t*) arg;
+    qdr_core_work_list_t  work_list;
+    qdr_core_work_t      *work;
+
+    qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
+    while (core->running) {
+        //
+        // Use the lock only to protect the condition variable and the work list
+        //
+        sys_mutex_lock(core->lock);
+
+        //
+        // Block on the condition variable when there is no work to do
+        //
+        while (core->running && DEQ_IS_EMPTY(core->work_list))
+            sys_cond_wait(core->cond, core->lock);
+
+        //
+        // Move the entire work list to a private list so we can process it without
+        // holding the lock
+        //
+        DEQ_MOVE(core->work_list, work_list);
+        sys_mutex_unlock(core->lock);
+
+        //
+        // Process and free all of the work items in the list
+        //
+        work = DEQ_HEAD(work_list);
+        while (work) {
+            DEQ_REMOVE_HEAD(work_list);
+
+            if (core->running)
+                router_core_do_work(core, work);
+
+            free_qdr_core_work_t(work);
+            work = DEQ_HEAD(work_list);
+        }
+    }
+
+    qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
+    return 0;
+}
+
+
+qdr_core_t *qdr_core(void)
+{
+    qdr_core_t *core = NEW(qdr_core_t);
+    ZERO(core);
+
+    //
+    // Set up the logging source for the router core
+    //
+    core->log = qd_log_source("ROUTER_CORE");
+
+    //
+    // Set up the threading support
+    //
+    core->cond    = sys_cond();
+    core->lock    = sys_mutex();
+    core->running = true;
+    DEQ_INIT(core->work_list);
+    core->thread  = sys_thread(router_core_thread, core);
+
+    return core;
+}
+
+
+void qdr_core_free(qdr_core_t *core)
+{
+    //
+    // Stop and join the thread
+    //
+    core->running = false;
+    sys_cond_signal(core->cond);
+    sys_thread_join(core->thread);
+
+    //
+    // Free the core resources
+    //
+    sys_thread_free(core->thread);
+    sys_cond_free(core->cond);
+    sys_mutex_free(core->lock);
+    free(core);
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 361cd44..c2eb6d9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -18,6 +18,7 @@
  */
 
 #include <qpid/dispatch/python_embedded.h>
+#include <qpid/dispatch/router_core.h>
 #include <stdio.h>
 #include <string.h>
 #include <stdbool.h>
@@ -745,6 +746,10 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
     //
     pn_link_advance(pn_link);
 
+    //
+    // If there's no router link, free the message and finish.  It's likely that the link
+    // is closing.
+    //
     if (!rlink) {
         qd_message_free(msg);
         return;
@@ -1802,6 +1807,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
 
     qd->router = router;
     router->qd           = qd;
+    router->router_core  = 0;
     router->log_source   = qd_log_source("ROUTER");
     router->router_mode  = mode;
     router->router_area  = area;
@@ -1863,6 +1869,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
     qd_router_python_setup(qd->router);
+    qd->router->router_core = qdr_core();
     qd_timer_schedule(qd->router->timer, 1000);
 }
 
@@ -1889,6 +1896,7 @@ void qd_router_free(qd_router_t *router)
         free_qd_address_t(addr);
     }
 
+    qdr_core_free(router->router_core);
     qd_timer_free(router->timer);
     sys_mutex_free(router->lock);
     qd_bitmask_free(router->neighbor_free_mask);
@@ -1960,24 +1968,6 @@ void qd_router_unregister_address(qd_address_t *ad)
 }
 
 
-void qd_address_set_redirect(qd_address_t *address, qd_address_t *redirect)
-{
-    address->redirect = redirect;
-}
-
-
-void qd_address_set_static_cc(qd_address_t *address, qd_address_t *cc)
-{
-    address->static_cc = cc;
-}
-
-
-void qd_address_set_dynamic_cc(qd_address_t *address, qd_address_t *cc)
-{
-    address->dynamic_cc = cc;
-}
-
-
 qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
                                           qd_field_iterator_t *addr_iter,
                                           bool *is_local, bool *is_direct)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index eee402f..a8ae67a 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -26,6 +26,7 @@
 
 #include <qpid/dispatch/enum.h>
 #include <qpid/dispatch/router.h>
+#include <qpid/dispatch/router_core.h>
 #include <qpid/dispatch/message.h>
 #include <qpid/dispatch/bitmask.h>
 #include <qpid/dispatch/hash.h>
@@ -87,7 +88,7 @@ struct qd_router_link_t {
     qd_router_link_ref_t     *ref;             ///< Pointer to a containing reference object
     char                     *target;          ///< Target address for incoming links
     qd_routed_event_list_t    event_fifo;      ///< FIFO of outgoing delivery/link events (no messages)
-    qd_routed_event_list_t    msg_fifo;        ///< FIFO of outgoing message deliveries
+    qd_routed_event_list_t    msg_fifo;        ///< FIFO of incoming or outgoing message deliveries
     qd_router_delivery_list_t deliveries;      ///< [own] outstanding unsettled deliveries
     bool                      strip_inbound_annotations;  ///<should the dispatch specific inbound annotations be stripped at the ingress router
     bool                      strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
@@ -177,9 +178,6 @@ struct qd_address_t {
     qd_router_ref_list_t       rnodes;            ///< Remotely-Connected Consumers
     qd_hash_handle_t          *hash_handle;       ///< Linkage back to the hash table entry
     qd_address_semantics_t     semantics;
-    qd_address_t              *redirect;
-    qd_address_t              *static_cc;
-    qd_address_t              *dynamic_cc;
     bool                       toggle;
     bool                       waypoint;
     bool                       block_deletion;
@@ -249,6 +247,7 @@ DEQ_DECLARE(qd_waypoint_t, qd_waypoint_list_t);
 
 struct qd_router_t {
     qd_dispatch_t            *qd;
+    qdr_core_t               *router_core;
     qd_log_source_t          *log_source;
     qd_router_mode_t          router_mode;
     const char               *router_area;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org