You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/23 23:31:56 UTC
qpid-dispatch git commit: DISPATCH-179 - Added agent submodule in the
router core to handle management requests.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 995e5ad4c -> f733be6e9
DISPATCH-179 - Added agent submodule in the router core to handle management requests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f733be6e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f733be6e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f733be6e
Branch: refs/heads/tross-DISPATCH-179-1
Commit: f733be6e9fed5ad793bf9e5162cf2632753b2470
Parents: 995e5ad
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Oct 23 17:31:14 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Oct 23 17:31:14 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 7 +-
src/CMakeLists.txt | 1 +
src/router_core/agent.c | 177 +++++++++++++++++++++++++++++
src/router_core/route_tables.c | 24 ----
src/router_core/router_core.c | 41 +++++--
src/router_core/router_core_private.h | 39 ++++++-
src/router_core/router_core_thread.c | 7 +-
src/router_node.c | 2 +-
8 files changed, 255 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 11f030f..d50eb7c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -20,6 +20,7 @@
*/
#include <qpid/dispatch.h>
+#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/bitmask.h>
//
@@ -34,7 +35,7 @@ typedef struct qdr_delivery_t qdr_delivery_t;
/**
* Allocate and start an instance of the router core module.
*/
-qdr_core_t *qdr_core(void);
+qdr_core_t *qdr_core(qd_dispatch_t *qd);
/**
* Stop and deallocate an instance of the router core.
@@ -147,11 +148,11 @@ void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t
void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
-qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset);
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset, qd_composed_field_t *body);
void qdr_manage_get_next(qdr_query_t *query);
void qdr_query_cancel(qdr_query_t *query);
-typedef void (*qdr_manage_response_t) (void *context, int status_code, qd_composed_field_t *body);
+typedef void (*qdr_manage_response_t) (void *context, const qd_amqp_error_t *status, bool more);
void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a9136b1..eee2b10 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -65,6 +65,7 @@ set(qpid_dispatch_SOURCES
python_embedded.c
router_agent.c
router_config.c
+ router_core/agent.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
new file mode 100644
index 0000000..2787ae5
--- /dev/null
+++ b/src/router_core/agent.c
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+#include "router_core_private.h"
+#include <stdio.h>
+
+static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type,
+ int offset, qd_composed_field_t *body)
+{
+ qdr_action_t *action = qdr_action(qdrh_manage_get_first);
+ qdr_query_t *query = new_qdr_query_t();
+
+ query->entity_type = type;
+ query->context = context;
+ query->body = body;
+ query->next_key = 0;
+ query->more = false;
+ query->status = 0;
+
+ action->args.agent.query = query;
+ action->args.agent.offset = offset;
+
+ qdr_action_enqueue(core, action);
+
+ return query;
+}
+
+
+void qdr_manage_get_next(qdr_query_t *query)
+{
+}
+
+
+void qdr_query_cancel(qdr_query_t *query)
+{
+}
+
+
+void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler)
+{
+ core->agent_response_handler = response_handler;
+}
+
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+static void qdr_agent_response_handler(void *context)
+{
+ qdr_core_t *core = (qdr_core_t*) context;
+ qdr_query_t *query;
+ bool done = false;
+
+ while (!done) {
+ sys_mutex_lock(core->query_lock);
+ query = DEQ_HEAD(core->outgoing_query_list);
+ if (query)
+ DEQ_REMOVE_HEAD(core->outgoing_query_list);
+ done = DEQ_SIZE(core->outgoing_query_list) == 0;
+ sys_mutex_unlock(core->query_lock);
+
+ if (query) {
+ core->agent_response_handler(query->context, query->status, query->more);
+ if (!query->more) {
+ if (query->next_key)
+ qdr_field_free(query->next_key);
+ free_qdr_query_t(query);
+ }
+ }
+ }
+}
+
+
+static void qdr_agent_enqueue_response(qdr_core_t *core, qdr_query_t *query)
+{
+ sys_mutex_lock(core->query_lock);
+ DEQ_INSERT_TAIL(core->outgoing_query_list, query);
+ bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
+ sys_mutex_unlock(core->query_lock);
+
+ if (notify)
+ qd_timer_schedule(core->agent_timer, 0);
+}
+
+
+static void qdr_manage_get_first_address(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+ if (offset >= DEQ_SIZE(core->addrs)) {
+ query->more = false;
+ query->status = &QD_AMQP_OK;
+ qdr_agent_enqueue_response(core, query);
+ return;
+ }
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+void qdr_agent_setup(qdr_core_t *core)
+{
+ DEQ_INIT(core->outgoing_query_list);
+ core->query_lock = sys_mutex();
+ core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core);
+}
+
+
+static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_query_t *query = action->args.agent.query;
+ int offset = action->args.agent.offset;
+
+ if (!discard)
+ switch (query->entity_type) {
+ case QD_ROUTER_CONNECTION :
+ break;
+
+ case QD_ROUTER_LINK :
+ break;
+
+ case QD_ROUTER_ADDRESS :
+ qdr_manage_get_first_address(core, query, offset);
+ break;
+
+ case QD_ROUTER_WAYPOINT :
+ break;
+
+ case QD_ROUTER_EXCHANGE :
+ break;
+
+ case QD_ROUTER_BINDING :
+ break;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 2ea108b..c4deb7f 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,9 +20,6 @@
#include "router_core_private.h"
#include <stdio.h>
-static qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
-static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-
static void qdrh_add_router (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdrh_del_router (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdrh_set_link (qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -136,27 +133,6 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
//==================================================================================
-// Internal Functions
-//==================================================================================
-
-static qdr_action_t *qdr_action(qdr_action_handler_t action_handler)
-{
- qdr_action_t *action = new_qdr_action_t();
- ZERO(action);
- action->action_handler = action_handler;
- return action;
-}
-
-static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
-{
- sys_mutex_lock(core->lock);
- DEQ_INSERT_TAIL(core->action_list, action);
- sys_cond_signal(core->cond);
- sys_mutex_unlock(core->lock);
-}
-
-
-//==================================================================================
// In-Thread Functions
//==================================================================================
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 8f31b7a..99edccc 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -20,7 +20,7 @@
#include "router_core_private.h"
#include <stdio.h>
-
+ALLOC_DEFINE(qdr_query_t);
ALLOC_DEFINE(qdr_address_t);
ALLOC_DEFINE(qdr_node_t);
ALLOC_DEFINE(qdr_link_t);
@@ -28,11 +28,13 @@ ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
-qdr_core_t *qdr_core(void)
+qdr_core_t *qdr_core(qd_dispatch_t *qd)
{
qdr_core_t *core = NEW(qdr_core_t);
ZERO(core);
+ core->qd = qd;
+
//
// Set up the logging source for the router core
//
@@ -41,11 +43,15 @@ qdr_core_t *qdr_core(void)
//
// Set up the threading support
//
- core->cond = sys_cond();
- core->lock = sys_mutex();
- core->running = true;
+ core->action_cond = sys_cond();
+ core->action_lock = sys_mutex();
+ core->running = true;
DEQ_INIT(core->action_list);
- core->thread = sys_thread(router_core_thread, core);
+
+ //
+ // Launch the core thread
+ //
+ core->thread = sys_thread(router_core_thread, core);
return core;
}
@@ -57,15 +63,15 @@ void qdr_core_free(qdr_core_t *core)
// Stop and join the thread
//
core->running = false;
- sys_cond_signal(core->cond);
+ sys_cond_signal(core->action_cond);
sys_thread_join(core->thread);
//
// Free the core resources
//
sys_thread_free(core->thread);
- sys_cond_free(core->cond);
- sys_mutex_free(core->lock);
+ sys_cond_free(core->action_cond);
+ sys_mutex_free(core->action_lock);
free(core);
}
@@ -112,6 +118,23 @@ void qdr_field_free(qdr_field_t *field)
}
+qdr_action_t *qdr_action(qdr_action_handler_t action_handler)
+{
+ qdr_action_t *action = new_qdr_action_t();
+ ZERO(action);
+ action->action_handler = action_handler;
+ return action;
+}
+
+void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
+{
+ sys_mutex_lock(core->action_lock);
+ DEQ_INSERT_TAIL(core->action_list, action);
+ sys_cond_signal(core->action_cond);
+ sys_mutex_unlock(core->action_lock);
+}
+
+
qdr_address_t *qdr_address(qd_address_semantics_t semantics)
{
qdr_address_t *addr = new_qdr_address_t();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 697d7c9..de5566b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -58,12 +58,30 @@ struct qdr_action_t {
char address_phase;
qd_address_semantics_t semantics;
} route_table;
+ struct {
+ qdr_query_t *query;
+ int offset;
+ } agent;
} args;
};
ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
+struct qdr_query_t {
+ DEQ_LINKS(qdr_query_t);
+ qd_router_entity_type_t entity_type;
+ void *context;
+ qd_composed_field_t *body;
+ qdr_field_t *next_key;
+ bool more;
+ const qd_amqp_error_t *status;
+};
+
+ALLOC_DECLARE(qdr_query_t);
+DEQ_DECLARE(qdr_query_t, qdr_query_list_t);
+
+
typedef struct qdr_address_t qdr_address_t;
typedef struct qdr_node_t qdr_node_t;
typedef struct qdr_router_ref_t qdr_router_ref_t;
@@ -181,13 +199,25 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
struct qdr_core_t {
+ qd_dispatch_t *qd;
qd_log_source_t *log;
- sys_cond_t *cond;
- sys_mutex_t *lock;
sys_thread_t *thread;
bool running;
qdr_action_list_t action_list;
-
+ sys_cond_t *action_cond;
+ sys_mutex_t *action_lock;
+
+ //
+ // Agent section
+ //
+ qdr_query_list_t outgoing_query_list;
+ sys_mutex_t *query_lock;
+ qd_timer_t *agent_timer;
+ qdr_manage_response_t agent_response_handler;
+
+ //
+ // Route table section
+ //
void *rt_context;
qdr_mobile_added_t rt_mobile_added;
qdr_mobile_removed_t rt_mobile_removed;
@@ -210,5 +240,8 @@ struct qdr_core_t {
void *router_core_thread(void *arg);
void qdr_route_table_setup(qdr_core_t *core);
+void qdr_agent_setup(qdr_core_t *core);
+qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
+void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 093052c..7ef11ba 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -37,26 +37,27 @@ void *router_core_thread(void *arg)
qdr_action_t *action;
qdr_route_table_setup(core);
+ qdr_agent_setup(core);
qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
while (core->running) {
//
// Use the lock only to protect the condition variable and the action list
//
- sys_mutex_lock(core->lock);
+ sys_mutex_lock(core->action_lock);
//
// Block on the condition variable when there is no action to do
//
while (core->running && DEQ_IS_EMPTY(core->action_list))
- sys_cond_wait(core->cond, core->lock);
+ sys_cond_wait(core->action_cond, core->action_lock);
//
// Move the entire action list to a private list so we can process it without
// holding the lock
//
DEQ_MOVE(core->action_list, action_list);
- sys_mutex_unlock(core->lock);
+ sys_mutex_unlock(core->action_lock);
//
// Process and free all of the action items in the list
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c2eb6d9..a45e9bb 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1869,7 +1869,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
void qd_router_setup_late(qd_dispatch_t *qd)
{
qd_router_python_setup(qd->router);
- qd->router->router_core = qdr_core();
+ qd->router->router_core = qdr_core(qd);
qd_timer_schedule(qd->router->timer, 1000);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org