You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/23 23:31:56 UTC

qpid-dispatch git commit: DISPATCH-179 - Added agent submodule in the router core to handle management requests.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 995e5ad4c -> f733be6e9


DISPATCH-179 - Added agent submodule in the router core to handle management requests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f733be6e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f733be6e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f733be6e

Branch: refs/heads/tross-DISPATCH-179-1
Commit: f733be6e9fed5ad793bf9e5162cf2632753b2470
Parents: 995e5ad
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Oct 23 17:31:14 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Oct 23 17:31:14 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |   7 +-
 src/CMakeLists.txt                    |   1 +
 src/router_core/agent.c               | 177 +++++++++++++++++++++++++++++
 src/router_core/route_tables.c        |  24 ----
 src/router_core/router_core.c         |  41 +++++--
 src/router_core/router_core_private.h |  39 ++++++-
 src/router_core/router_core_thread.c  |   7 +-
 src/router_node.c                     |   2 +-
 8 files changed, 255 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 11f030f..d50eb7c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -20,6 +20,7 @@
  */
 
 #include <qpid/dispatch.h>
+#include <qpid/dispatch/amqp.h>
 #include <qpid/dispatch/bitmask.h>
 
 //
@@ -34,7 +35,7 @@ typedef struct qdr_delivery_t qdr_delivery_t;
 /**
  * Allocate and start an instance of the router core module.
  */
-qdr_core_t *qdr_core(void);
+qdr_core_t *qdr_core(qd_dispatch_t *qd);
 
 /**
  * Stop and deallocate an instance of the router core.
@@ -147,11 +148,11 @@ void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t
 void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
 void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes);
 
-qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset);
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type, int offset, qd_composed_field_t *body);
 void qdr_manage_get_next(qdr_query_t *query);
 void qdr_query_cancel(qdr_query_t *query);
 
-typedef void (*qdr_manage_response_t) (void *context, int status_code, qd_composed_field_t *body);
+typedef void (*qdr_manage_response_t) (void *context, const qd_amqp_error_t *status, bool more);
 void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a9136b1..eee2b10 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -65,6 +65,7 @@ set(qpid_dispatch_SOURCES
   python_embedded.c
   router_agent.c
   router_config.c
+  router_core/agent.c
   router_core/router_core.c
   router_core/router_core_thread.c
   router_core/route_tables.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
new file mode 100644
index 0000000..2787ae5
--- /dev/null
+++ b/src/router_core/agent.c
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+#include "router_core_private.h"
+#include <stdio.h>
+
+static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+void qdr_manage_create(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, qd_parsed_field_t *attributes)
+{
+}
+
+
+qdr_query_t *qdr_manage_get_first(qdr_core_t *core, void *context, qd_router_entity_type_t type,
+                                  int offset, qd_composed_field_t *body)
+{
+    qdr_action_t *action = qdr_action(qdrh_manage_get_first);
+    qdr_query_t  *query  = new_qdr_query_t();
+
+    query->entity_type = type;
+    query->context     = context;
+    query->body        = body;
+    query->next_key    = 0;
+    query->more        = false;
+    query->status      = 0;
+
+    action->args.agent.query  = query;
+    action->args.agent.offset = offset;
+
+    qdr_action_enqueue(core, action);
+
+    return query;
+}
+
+
+void qdr_manage_get_next(qdr_query_t *query)
+{
+}
+
+
+void qdr_query_cancel(qdr_query_t *query)
+{
+}
+
+
+void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler)
+{
+    core->agent_response_handler = response_handler;
+}
+
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+static void qdr_agent_response_handler(void *context)
+{
+    qdr_core_t  *core = (qdr_core_t*) context;
+    qdr_query_t *query;
+    bool         done = false;
+
+    while (!done) {
+        sys_mutex_lock(core->query_lock);
+        query = DEQ_HEAD(core->outgoing_query_list);
+        if (query)
+            DEQ_REMOVE_HEAD(core->outgoing_query_list);
+        done = DEQ_SIZE(core->outgoing_query_list) == 0;
+        sys_mutex_unlock(core->query_lock);
+
+        if (query) {
+            core->agent_response_handler(query->context, query->status, query->more);
+            if (!query->more) {
+                if (query->next_key)
+                    qdr_field_free(query->next_key);
+                free_qdr_query_t(query);
+            }
+        }
+    }
+}
+
+
+static void qdr_agent_enqueue_response(qdr_core_t *core, qdr_query_t *query)
+{
+    sys_mutex_lock(core->query_lock);
+    DEQ_INSERT_TAIL(core->outgoing_query_list, query);
+    bool notify = DEQ_SIZE(core->outgoing_query_list) == 1;
+    sys_mutex_unlock(core->query_lock);
+
+    if (notify)
+        qd_timer_schedule(core->agent_timer, 0);
+}
+
+
+static void qdr_manage_get_first_address(qdr_core_t *core, qdr_query_t *query, int offset)
+{
+    if (offset >= DEQ_SIZE(core->addrs)) {
+        query->more        = false;
+        query->status      = &QD_AMQP_OK;
+        qdr_agent_enqueue_response(core, query);
+        return;
+    }
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+void qdr_agent_setup(qdr_core_t *core)
+{
+    DEQ_INIT(core->outgoing_query_list);
+    core->query_lock  = sys_mutex();
+    core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core);
+}
+
+
+static void qdrh_manage_get_first(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_query_t *query  = action->args.agent.query;
+    int          offset = action->args.agent.offset;
+
+    if (!discard)
+        switch (query->entity_type) {
+        case QD_ROUTER_CONNECTION :
+            break;
+
+        case QD_ROUTER_LINK :
+            break;
+
+        case QD_ROUTER_ADDRESS :
+            qdr_manage_get_first_address(core, query, offset);
+            break;
+
+        case QD_ROUTER_WAYPOINT :
+            break;
+
+        case QD_ROUTER_EXCHANGE :
+            break;
+
+        case QD_ROUTER_BINDING :
+            break;
+        }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 2ea108b..c4deb7f 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -20,9 +20,6 @@
 #include "router_core_private.h"
 #include <stdio.h>
 
-static qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
-static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
-
 static void qdrh_add_router       (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdrh_del_router       (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdrh_set_link         (qdr_core_t *core, qdr_action_t *action, bool discard);
@@ -136,27 +133,6 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
 
 
 //==================================================================================
-// Internal Functions
-//==================================================================================
-
-static qdr_action_t *qdr_action(qdr_action_handler_t action_handler)
-{
-    qdr_action_t *action = new_qdr_action_t();
-    ZERO(action);
-    action->action_handler = action_handler;
-    return action;
-}
-
-static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
-{
-    sys_mutex_lock(core->lock);
-    DEQ_INSERT_TAIL(core->action_list, action);
-    sys_cond_signal(core->cond);
-    sys_mutex_unlock(core->lock);
-}
-
-
-//==================================================================================
 // In-Thread Functions
 //==================================================================================
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 8f31b7a..99edccc 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -20,7 +20,7 @@
 #include "router_core_private.h"
 #include <stdio.h>
 
-
+ALLOC_DEFINE(qdr_query_t);
 ALLOC_DEFINE(qdr_address_t);
 ALLOC_DEFINE(qdr_node_t);
 ALLOC_DEFINE(qdr_link_t);
@@ -28,11 +28,13 @@ ALLOC_DEFINE(qdr_router_ref_t);
 ALLOC_DEFINE(qdr_link_ref_t);
 
 
-qdr_core_t *qdr_core(void)
+qdr_core_t *qdr_core(qd_dispatch_t *qd)
 {
     qdr_core_t *core = NEW(qdr_core_t);
     ZERO(core);
 
+    core->qd = qd;
+
     //
     // Set up the logging source for the router core
     //
@@ -41,11 +43,15 @@ qdr_core_t *qdr_core(void)
     //
     // Set up the threading support
     //
-    core->cond    = sys_cond();
-    core->lock    = sys_mutex();
-    core->running = true;
+    core->action_cond = sys_cond();
+    core->action_lock = sys_mutex();
+    core->running     = true;
     DEQ_INIT(core->action_list);
-    core->thread  = sys_thread(router_core_thread, core);
+
+    //
+    // Launch the core thread
+    //
+    core->thread = sys_thread(router_core_thread, core);
 
     return core;
 }
@@ -57,15 +63,15 @@ void qdr_core_free(qdr_core_t *core)
     // Stop and join the thread
     //
     core->running = false;
-    sys_cond_signal(core->cond);
+    sys_cond_signal(core->action_cond);
     sys_thread_join(core->thread);
 
     //
     // Free the core resources
     //
     sys_thread_free(core->thread);
-    sys_cond_free(core->cond);
-    sys_mutex_free(core->lock);
+    sys_cond_free(core->action_cond);
+    sys_mutex_free(core->action_lock);
     free(core);
 }
 
@@ -112,6 +118,23 @@ void qdr_field_free(qdr_field_t *field)
 }
 
 
+qdr_action_t *qdr_action(qdr_action_handler_t action_handler)
+{
+    qdr_action_t *action = new_qdr_action_t();
+    ZERO(action);
+    action->action_handler = action_handler;
+    return action;
+}
+
+void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action)
+{
+    sys_mutex_lock(core->action_lock);
+    DEQ_INSERT_TAIL(core->action_list, action);
+    sys_cond_signal(core->action_cond);
+    sys_mutex_unlock(core->action_lock);
+}
+
+
 qdr_address_t *qdr_address(qd_address_semantics_t semantics)
 {
     qdr_address_t *addr = new_qdr_address_t();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 697d7c9..de5566b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -58,12 +58,30 @@ struct qdr_action_t {
             char                    address_phase;
             qd_address_semantics_t  semantics;
         } route_table;
+        struct {
+            qdr_query_t *query;
+            int          offset;
+        } agent;
     } args;
 };
 
 ALLOC_DECLARE(qdr_action_t);
 DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
 
+struct qdr_query_t {
+    DEQ_LINKS(qdr_query_t);
+    qd_router_entity_type_t  entity_type;
+    void                    *context;
+    qd_composed_field_t     *body;
+    qdr_field_t             *next_key;
+    bool                     more;
+    const qd_amqp_error_t   *status;
+};
+
+ALLOC_DECLARE(qdr_query_t);
+DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
+
+
 typedef struct qdr_address_t     qdr_address_t;
 typedef struct qdr_node_t        qdr_node_t;
 typedef struct qdr_router_ref_t  qdr_router_ref_t;
@@ -181,13 +199,25 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 
 
 struct qdr_core_t {
+    qd_dispatch_t     *qd;
     qd_log_source_t   *log;
-    sys_cond_t        *cond;
-    sys_mutex_t       *lock;
     sys_thread_t      *thread;
     bool               running;
     qdr_action_list_t  action_list;
-
+    sys_cond_t        *action_cond;
+    sys_mutex_t       *action_lock;
+
+    //
+    // Agent section
+    //
+    qdr_query_list_t       outgoing_query_list;
+    sys_mutex_t           *query_lock;
+    qd_timer_t            *agent_timer;
+    qdr_manage_response_t  agent_response_handler;
+
+    //
+    // Route table section
+    //
     void                 *rt_context;
     qdr_mobile_added_t    rt_mobile_added;
     qdr_mobile_removed_t  rt_mobile_removed;
@@ -210,5 +240,8 @@ struct qdr_core_t {
 
 void *router_core_thread(void *arg);
 void  qdr_route_table_setup(qdr_core_t *core);
+void  qdr_agent_setup(qdr_core_t *core);
+qdr_action_t *qdr_action(qdr_action_handler_t action_handler);
+void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index 093052c..7ef11ba 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -37,26 +37,27 @@ void *router_core_thread(void *arg)
     qdr_action_t      *action;
 
     qdr_route_table_setup(core);
+    qdr_agent_setup(core);
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running");
     while (core->running) {
         //
         // Use the lock only to protect the condition variable and the action list
         //
-        sys_mutex_lock(core->lock);
+        sys_mutex_lock(core->action_lock);
 
         //
         // Block on the condition variable when there is no action to do
         //
         while (core->running && DEQ_IS_EMPTY(core->action_list))
-            sys_cond_wait(core->cond, core->lock);
+            sys_cond_wait(core->action_cond, core->action_lock);
 
         //
         // Move the entire action list to a private list so we can process it without
         // holding the lock
         //
         DEQ_MOVE(core->action_list, action_list);
-        sys_mutex_unlock(core->lock);
+        sys_mutex_unlock(core->action_lock);
 
         //
         // Process and free all of the action items in the list

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f733be6e/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c2eb6d9..a45e9bb 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1869,7 +1869,7 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are
 void qd_router_setup_late(qd_dispatch_t *qd)
 {
     qd_router_python_setup(qd->router);
-    qd->router->router_core = qdr_core();
+    qd->router->router_core = qdr_core(qd);
     qd_timer_schedule(qd->router->timer, 1000);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org