You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/07 22:59:59 UTC
qpid-dispatch git commit: DISPATCH-179 - WIP,
added core framework and began work on the core API
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 [created] ea2d90b6f
DISPATCH-179 - WIP, added core framework and began work on the core API
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ea2d90b6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ea2d90b6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ea2d90b6
Branch: refs/heads/tross-DISPATCH-179-1
Commit: ea2d90b6f87d2f11d4160babcd8ee0d17e49a0bb
Parents: 63e8a6b
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 7 16:58:50 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 7 16:58:50 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/ctools.h | 1 +
include/qpid/dispatch/router_core.h | 147 ++++++++++++++++++++
python/qpid_dispatch/management/qdrouter.json | 1 +
src/CMakeLists.txt | 1 +
src/router_core.c | 151 +++++++++++++++++++++
src/router_node.c | 26 ++--
src/router_private.h | 7 +-
7 files changed, 312 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/include/qpid/dispatch/ctools.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/ctools.h b/include/qpid/dispatch/ctools.h
index 698c5b9..3ac5998 100644
--- a/include/qpid/dispatch/ctools.h
+++ b/include/qpid/dispatch/ctools.h
@@ -52,6 +52,7 @@
#define DEQ_SIZE(d) ((d).size)
#define DEQ_NEXT(i) (i)->next
#define DEQ_PREV(i) (i)->prev
+#define DEQ_MOVE(d1,d2) do {d2 = d1; DEQ_INIT(d1);} while (0)
/**
*@pre ptr points to first element of deq
*@post ptr points to first element of deq that passes test, or 0. Test should involve ptr.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
new file mode 100644
index 0000000..b2e4aac
--- /dev/null
+++ b/include/qpid/dispatch/router_core.h
@@ -0,0 +1,147 @@
+#ifndef __router_core_h__
+#define __router_core_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch.h>
+#include <qpid/dispatch/bitmask.h>
+
+typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_connection_t qdr_connection_t;
+typedef struct qdr_link_t qdr_link_t;
+typedef struct qdr_delivery_t qdr_delivery_t;
+
+/**
+ * Allocate and start an instance of the router core module.
+ */
+qdr_core_t *qdr_core(void);
+
+/**
+ * Stop and deallocate an instance of the router core.
+ */
+void qdr_core_free(qdr_core_t *core);
+
+/**
+ ******************************************************************************
+ * Route table maintenance functions
+ ******************************************************************************
+ */
+typedef enum {
+ QD_WAYPOINT_SOURCE,
+ QD_WAYPOINT_SINK,
+ QD_WAYPOINT_THROUGH,
+ QD_WAYPOINT_BYPASS,
+ QD_WAYPOINT_TAP
+} qd_waypoint_style_t;
+
+void qdr_core_add_router(qdr_core_t *core, const char *address, int router_maskbit);
+void qdr_core_del_router(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_link(qdr_core_t *core, int router_maskbit, int link_maskbit);
+void qdr_core_remove_link(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_next_hop(qdr_core_t *core, int router_maskbit, int nh_router_maskbit);
+void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
+void qdr_core_set_valid_origins(qdr_core_t *core, const qd_bitmask_t *routers);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char *address, char phase);
+void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const char *address, char phase);
+
+void qdr_core_add_link_route(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *prefix);
+void qdr_core_del_link_route(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *prefix);
+
+void qdr_core_add_waypoint(qdr_core_t *core, qd_field_iterator_t *conn_label, qd_field_iterator_t *address, qd_waypoint_style_t style, char in_phase, char out_phase);
+
+//
+// The following callbacks shall be invoked on a connection thread from the server thread pool.
+//
+typedef void (*qdr_mobile_added_t) (void *context, qd_field_iterator_t *address);
+typedef void (*qdr_mobile_removed_t) (void *context, qd_field_iterator_t *address);
+typedef void (*qdr_link_lost_t) (void *context, int link_maskbit);
+typedef void (*qdr_connection_activate_t) (void *context, const qdr_connection_t *connection);
+typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit);
+
+void qdr_core_route_table_handlers(void *context,
+ qdr_mobile_added_t mobile_added,
+ qdr_mobile_removed_t mobile_removed,
+ qdr_link_lost_t link_lost,
+ qdr_connection_activate_t connection_activate);
+
+void qdr_core_subscribe(qdr_core_t *core, const char *address, bool local, bool mobile, qdr_receive_t on_message, void *context);
+
+
+/**
+ ******************************************************************************
+ * Connection functions
+ ******************************************************************************
+ */
+typedef enum {
+ QDR_WORK_FIRST_ATTACH, // Core is initiating a first-attach
+ QDR_WORK_SECOND_ATTACH, // Core is sending a second-attach
+ QDR_WORK_DETACH, // Core is sending a detach
+ QDR_WORK_DELIVERY // Core is updating a delivery for in-thread processing
+} qdr_work_type_t;
+
+typedef struct {
+ qdr_work_type_t work_type;
+ pn_terminus_t *source; // For FIRST_ATTACH
+ pn_terminus_t *target; // For FIRST_ATTACH
+ qdr_link_t *link; // For SECOND_ATTACH, DETACH
+ qdr_delivery_t *delivery; // For DELIVERY
+} qdr_work_t;
+
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, qd_field_iterator_t *label);
+void qdr_connection_closed(qdr_connection_t *conn);
+void qdr_connection_set_context(qdr_connection_t *conn, void *context);
+void *qdr_connection_get_context(qdr_connection_t *conn);
+qdr_work_t *qdr_connection_work(qdr_connection_t *conn);
+
+/**
+ ******************************************************************************
+ * Link functions
+ ******************************************************************************
+ */
+qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, pn_terminus_t *source, pn_terminus_t *target);
+void qdr_link_second_attach(qdr_link_t *link, pn_terminus_t *source, pn_terminus_t *target);
+void qdr_link_detach(qdr_link_t *link, pn_condition_t *condition);
+
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
+
+
+/**
+ ******************************************************************************
+ * Delivery functions
+ ******************************************************************************
+ */
+void qdr_delivery_update_disposition(qdr_delivery_t *delivery);
+void qdr_delivery_update_flow(qdr_delivery_t *delivery);
+void qdr_delivery_process(qdr_delivery_t *delivery);
+
+/**
+ ******************************************************************************
+ * Management instrumentation functions
+ ******************************************************************************
+ */
+typedef enum {
+ QD_ROUTER_CONNECTION,
+ QD_ROUTER_LINK,
+ QD_ROUTER_ADDRESS
+} qd_router_entity_type_t;
+
+void qdr_core_query(qdr_core_t *core, qd_router_entity_type_t type, const char *filter, void *context);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 3103224..e447f19 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -690,6 +690,7 @@
"module": {
"type":[
"ROUTER",
+ "ROUTER_CORE",
"ROUTER_HELLO",
"ROUTER_LS",
"ROUTER_MA",
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 63ccd60..4b3bd07 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -65,6 +65,7 @@ set(qpid_dispatch_SOURCES
python_embedded.c
router_agent.c
router_config.c
+ router_core.c
router_delivery.c
router_node.c
router_forwarders.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core.c b/src/router_core.c
new file mode 100644
index 0000000..fd5e22d
--- /dev/null
+++ b/src/router_core.c
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dispatch_private.h"
+#include <qpid/dispatch/router_core.h>
+#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/log.h>
+#include <memory.h>
+
+/**
+ * Creates a thread that is dedicated to managing and using the routing table.
+ * The purpose of moving this function into one thread is to remove the widespread
+ * lock contention that happens with synchrounous multi-threaded routing.
+ *
+ * This module owns, manages, and uses the router-link list and the address hash table
+ */
+
+
+/**
+ * The following data structures are private to the core module. They are defined
+ * here to ensure their invisibiliy outside the core.
+ */
+
+typedef struct qdr_core_work_t {
+ DEQ_LINKS(struct qdr_core_work_t);
+} qdr_core_work_t;
+
+ALLOC_DECLARE(qdr_core_work_t);
+ALLOC_DEFINE(qdr_core_work_t);
+DEQ_DECLARE(qdr_core_work_t, qdr_core_work_list_t);
+
+struct qdr_core_t {
+ qd_log_source_t *log;
+ sys_cond_t *cond;
+ sys_mutex_t *lock;
+ sys_thread_t *thread;
+ bool running;
+ qdr_core_work_list_t work_list;
+};
+
+
+static void router_core_do_work(qdr_core_t *core, qdr_core_work_t *work)
+{
+}
+
+
+static void *router_core_thread(void *arg)
+{
+ qdr_core_t *core = (qdr_core_t*) arg;
+ qdr_core_work_list_t work_list;
+ qdr_core_work_t *work;
+
+ qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
+ while (core->running) {
+ //
+ // Use the lock only to protect the condition variable and the work list
+ //
+ sys_mutex_lock(core->lock);
+
+ //
+ // Block on the condition variable when there is no work to do
+ //
+ while (core->running && DEQ_IS_EMPTY(core->work_list))
+ sys_cond_wait(core->cond, core->lock);
+
+ //
+ // Move the entire work list to a private list so we can process it without
+ // holding the lock
+ //
+ DEQ_MOVE(core->work_list, work_list);
+ sys_mutex_unlock(core->lock);
+
+ //
+ // Process and free all of the work items in the list
+ //
+ work = DEQ_HEAD(work_list);
+ while (work) {
+ DEQ_REMOVE_HEAD(work_list);
+
+ if (core->running)
+ router_core_do_work(core, work);
+
+ free_qdr_core_work_t(work);
+ work = DEQ_HEAD(work_list);
+ }
+ }
+
+ qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
+ return 0;
+}
+
+
+qdr_core_t *qdr_core(void)
+{
+ qdr_core_t *core = NEW(qdr_core_t);
+ ZERO(core);
+
+ //
+ // Set up the logging source for the router core
+ //
+ core->log = qd_log_source("ROUTER_CORE");
+
+ //
+ // Set up the threading support
+ //
+ core->cond = sys_cond();
+ core->lock = sys_mutex();
+ core->running = true;
+ DEQ_INIT(core->work_list);
+ core->thread = sys_thread(router_core_thread, core);
+
+ return core;
+}
+
+
+void qdr_core_free(qdr_core_t *core)
+{
+ //
+ // Stop and join the thread
+ //
+ core->running = false;
+ sys_cond_signal(core->cond);
+ sys_thread_join(core->thread);
+
+ //
+ // Free the core resources
+ //
+ sys_thread_free(core->thread);
+ sys_cond_free(core->cond);
+ sys_mutex_free(core->lock);
+ free(core);
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 361cd44..c2eb6d9 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -18,6 +18,7 @@
*/
#include <qpid/dispatch/python_embedded.h>
+#include <qpid/dispatch/router_core.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
@@ -745,6 +746,10 @@ static void router_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd
//
pn_link_advance(pn_link);
+ //
+ // If there's no router link, free the message and finish. It's likely that the link
+ // is closing.
+ //
if (!rlink) {
qd_message_free(msg);
return;
@@ -1802,6 +1807,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
qd->router = router;
router->qd = qd;
+ router->router_core = 0;
router->log_source = qd_log_source("ROUTER");
router->router_mode = mode;
router->router_area = area;
@@ -1863,6 +1869,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd_router_python_setup(qd->router);
+ qd->router->router_core = qdr_core();
qd_timer_schedule(qd->router->timer, 1000);
}
@@ -1889,6 +1896,7 @@ void qd_router_free(qd_router_t *router)
free_qd_address_t(addr);
}
+ qdr_core_free(router->router_core);
qd_timer_free(router->timer);
sys_mutex_free(router->lock);
qd_bitmask_free(router->neighbor_free_mask);
@@ -1960,24 +1968,6 @@ void qd_router_unregister_address(qd_address_t *ad)
}
-void qd_address_set_redirect(qd_address_t *address, qd_address_t *redirect)
-{
- address->redirect = redirect;
-}
-
-
-void qd_address_set_static_cc(qd_address_t *address, qd_address_t *cc)
-{
- address->static_cc = cc;
-}
-
-
-void qd_address_set_dynamic_cc(qd_address_t *address, qd_address_t *cc)
-{
- address->dynamic_cc = cc;
-}
-
-
qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
qd_field_iterator_t *addr_iter,
bool *is_local, bool *is_direct)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ea2d90b6/src/router_private.h
----------------------------------------------------------------------
diff --git a/src/router_private.h b/src/router_private.h
index eee402f..a8ae67a 100644
--- a/src/router_private.h
+++ b/src/router_private.h
@@ -26,6 +26,7 @@
#include <qpid/dispatch/enum.h>
#include <qpid/dispatch/router.h>
+#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/message.h>
#include <qpid/dispatch/bitmask.h>
#include <qpid/dispatch/hash.h>
@@ -87,7 +88,7 @@ struct qd_router_link_t {
qd_router_link_ref_t *ref; ///< Pointer to a containing reference object
char *target; ///< Target address for incoming links
qd_routed_event_list_t event_fifo; ///< FIFO of outgoing delivery/link events (no messages)
- qd_routed_event_list_t msg_fifo; ///< FIFO of outgoing message deliveries
+ qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or outgoing message deliveries
qd_router_delivery_list_t deliveries; ///< [own] outstanding unsettled deliveries
bool strip_inbound_annotations; ///<should the dispatch specific inbound annotations be stripped at the ingress router
bool strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router
@@ -177,9 +178,6 @@ struct qd_address_t {
qd_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
qd_address_semantics_t semantics;
- qd_address_t *redirect;
- qd_address_t *static_cc;
- qd_address_t *dynamic_cc;
bool toggle;
bool waypoint;
bool block_deletion;
@@ -249,6 +247,7 @@ DEQ_DECLARE(qd_waypoint_t, qd_waypoint_list_t);
struct qd_router_t {
qd_dispatch_t *qd;
+ qdr_core_t *router_core;
qd_log_source_t *log_source;
qd_router_mode_t router_mode;
const char *router_area;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org