You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/11/05 16:17:03 UTC

qpid-dispatch git commit: DISPATCH-179 - Added core-connection handling

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 8c48073d2 -> df2bf5de9


DISPATCH-179 - Added core-connection handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/df2bf5de
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/df2bf5de
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/df2bf5de

Branch: refs/heads/tross-DISPATCH-179-1
Commit: df2bf5de9fe715c19bba55e48349341f90370610
Parents: 8c48073
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Nov 5 10:16:25 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Nov 5 10:16:25 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router.h        |   4 +
 include/qpid/dispatch/router_core.h   |  16 ++--
 src/CMakeLists.txt                    |   1 +
 src/router_core/connections.c         | 127 +++++++++++++++++++++++++++++
 src/router_core/router_core_private.h |  20 +++++
 src/router_node.c                     |   7 +-
 6 files changed, 169 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/include/qpid/dispatch/router.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h
index b1e4ced..30b9022 100644
--- a/include/qpid/dispatch/router.h
+++ b/include/qpid/dispatch/router.h
@@ -37,6 +37,8 @@ typedef struct qd_address_t qd_address_t;
 typedef uint8_t             qd_address_semantics_t;
 typedef struct qd_router_delivery_t qd_router_delivery_t;
 
+#include <qpid/dispatch/router_core.h>
+
 /**
  * @name Address fanout semantics
  * @{
@@ -140,6 +142,8 @@ typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int lin
 
 const char *qd_router_id(const qd_dispatch_t *qd);
 
+qdr_core_t *qd_router_core(qd_dispatch_t *qd);
+
 /** Register an address in the router's hash table.
  * @param qd Pointer to the dispatch instance.
  * @param address String form of address

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 13a3246..5180996 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -19,15 +19,16 @@
  * under the License.
  */
 
-#include <qpid/dispatch.h>
 #include <qpid/dispatch/amqp.h>
 #include <qpid/dispatch/bitmask.h>
 #include <qpid/dispatch/compose.h>
 #include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/router.h>
 
-//
-// All callbacks in this module shall be invoked on a connection thread from the server thread pool.
-//
+
+/**
+ * All callbacks in this module shall be invoked on a connection thread from the server thread pool.
+ */
 
 typedef struct qdr_core_t qdr_core_t;
 typedef struct qdr_connection_t qdr_connection_t;
@@ -100,7 +101,12 @@ typedef struct {
     qdr_delivery_t  *delivery; // For DELIVERY
 } qdr_work_t;
 
-qdr_connection_t *qdr_connection_opened(qdr_core_t *core, qd_field_iterator_t *label);
+/**
+ * qdr_connection_opened
+ *
+ * This function must be called once for every 
+ */
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *label);
 void qdr_connection_closed(qdr_connection_t *conn);
 void qdr_connection_set_context(qdr_connection_t *conn, void *context);
 void *qdr_connection_get_context(qdr_connection_t *conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 42c9fc9..5168803 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -67,6 +67,7 @@ set(qpid_dispatch_SOURCES
   router_config.c
   router_core/agent.c
   router_core/agent_address.c
+  router_core/connections.c
   router_core/router_core.c
   router_core/router_core_thread.c
   router_core/route_tables.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
new file mode 100644
index 0000000..14e95fd
--- /dev/null
+++ b/src/router_core/connections.c
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+ALLOC_DEFINE(qdr_connection_t);
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *label)
+{
+    qdr_action_t     *action = qdr_action(qdr_connection_opened_CT);
+    qdr_connection_t *conn   = new_qdr_connection_t();
+
+    conn->core         = core;
+    conn->user_context = 0;
+    conn->label        = label;
+
+    action->args.connection.conn = conn;
+    qdr_action_enqueue(core, action);
+
+    return conn;
+}
+
+
+void qdr_connection_closed(qdr_connection_t *conn)
+{
+    qdr_action_t *action = qdr_action(qdr_connection_closed_CT);
+    action->args.connection.conn = conn;
+    qdr_action_enqueue(conn->core, action);
+}
+
+
+void qdr_connection_set_context(qdr_connection_t *conn, void *context)
+{
+    if (conn)
+        conn->user_context = context;
+}
+
+
+void *qdr_connection_get_context(qdr_connection_t *conn)
+{
+    return conn ? conn->user_context : 0;
+}
+
+
+qdr_work_t *qdr_connection_work(qdr_connection_t *conn)
+{
+    return 0;
+}
+
+
+void qdr_connection_activate_handler(qdr_core_t *core, qdr_connection_activate_t handler, void *context)
+{
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_connection_t *conn = action->args.connection.conn;
+    DEQ_ITEM_INIT(conn);
+    DEQ_INSERT_TAIL(core->open_connections, conn);
+
+    //
+    // TODO - Look for waypoints that need to be activated now that their connection
+    //        is open.
+    //
+
+    //
+    // TODO - Look for link-route destinations to be activated now that their connection
+    //        is open.
+    //
+}
+
+
+static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_connection_t *conn = action->args.connection.conn;
+
+    //
+    // TODO - Deactivate waypoints and link-route destinations for this connection
+    //
+
+    //
+    // TODO - Clean up links associated with this connection
+    //
+
+    DEQ_REMOVE(core->open_connections, conn);
+    free_qdr_connection_t(conn);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 11393a3..54ff4cf 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -63,6 +63,13 @@ struct qdr_action_t {
         } route_table;
 
         //
+        // Arguments for connection-level actions
+        //
+        struct {
+            qdr_connection_t *conn;
+        } connection;
+
+        //
         // Arguments for in-process subscriptions
         //
         struct {
@@ -223,6 +230,17 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
 
 
+struct qdr_connection_t {
+    DEQ_LINKS(qdr_connection_t);
+    qdr_core_t *core;
+    void       *user_context;
+    const char *label;
+};
+
+ALLOC_DECLARE(qdr_connection_t);
+DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
+
+
 struct qdr_core_t {
     qd_dispatch_t     *qd;
     qd_log_source_t   *log;
@@ -232,6 +250,8 @@ struct qdr_core_t {
     sys_cond_t        *action_cond;
     sys_mutex_t       *action_lock;
 
+    qdr_connection_list_t open_connections;
+
     //
     // Agent section
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index a45e9bb..6779817 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -18,7 +18,6 @@
  */
 
 #include <qpid/dispatch/python_embedded.h>
-#include <qpid/dispatch/router_core.h>
 #include <stdio.h>
 #include <string.h>
 #include <stdbool.h>
@@ -1918,6 +1917,12 @@ const char *qd_router_id(const qd_dispatch_t *qd)
 }
 
 
+qdr_core_t *qd_router_core(qd_dispatch_t *qd)
+{
+    return qd->router->router_core;
+}
+
+
 qd_address_t *qd_router_register_address(qd_dispatch_t          *qd,
                                          const char             *address,
                                          qd_router_message_cb_t  on_message,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org