You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/11/05 16:17:03 UTC
qpid-dispatch git commit: DISPATCH-179 - Added core-connection
handling
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 8c48073d2 -> df2bf5de9
DISPATCH-179 - Added core-connection handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/df2bf5de
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/df2bf5de
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/df2bf5de
Branch: refs/heads/tross-DISPATCH-179-1
Commit: df2bf5de9fe715c19bba55e48349341f90370610
Parents: 8c48073
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Nov 5 10:16:25 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Nov 5 10:16:25 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router.h | 4 +
include/qpid/dispatch/router_core.h | 16 ++--
src/CMakeLists.txt | 1 +
src/router_core/connections.c | 127 +++++++++++++++++++++++++++++
src/router_core/router_core_private.h | 20 +++++
src/router_node.c | 7 +-
6 files changed, 169 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/include/qpid/dispatch/router.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h
index b1e4ced..30b9022 100644
--- a/include/qpid/dispatch/router.h
+++ b/include/qpid/dispatch/router.h
@@ -37,6 +37,8 @@ typedef struct qd_address_t qd_address_t;
typedef uint8_t qd_address_semantics_t;
typedef struct qd_router_delivery_t qd_router_delivery_t;
+#include <qpid/dispatch/router_core.h>
+
/**
* @name Address fanout semantics
* @{
@@ -140,6 +142,8 @@ typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int lin
const char *qd_router_id(const qd_dispatch_t *qd);
+qdr_core_t *qd_router_core(qd_dispatch_t *qd);
+
/** Register an address in the router's hash table.
* @param qd Pointer to the dispatch instance.
* @param address String form of address
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 13a3246..5180996 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -19,15 +19,16 @@
* under the License.
*/
-#include <qpid/dispatch.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/bitmask.h>
#include <qpid/dispatch/compose.h>
#include <qpid/dispatch/parse.h>
+#include <qpid/dispatch/router.h>
-//
-// All callbacks in this module shall be invoked on a connection thread from the server thread pool.
-//
+
+/**
+ * All callbacks in this module shall be invoked on a connection thread from the server thread pool.
+ */
typedef struct qdr_core_t qdr_core_t;
typedef struct qdr_connection_t qdr_connection_t;
@@ -100,7 +101,12 @@ typedef struct {
qdr_delivery_t *delivery; // For DELIVERY
} qdr_work_t;
-qdr_connection_t *qdr_connection_opened(qdr_core_t *core, qd_field_iterator_t *label);
+/**
+ * qdr_connection_opened
+ *
+ * This function must be called once for every
+ */
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *label);
void qdr_connection_closed(qdr_connection_t *conn);
void qdr_connection_set_context(qdr_connection_t *conn, void *context);
void *qdr_connection_get_context(qdr_connection_t *conn);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 42c9fc9..5168803 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -67,6 +67,7 @@ set(qpid_dispatch_SOURCES
router_config.c
router_core/agent.c
router_core/agent_address.c
+ router_core/connections.c
router_core/router_core.c
router_core/router_core_thread.c
router_core/route_tables.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
new file mode 100644
index 0000000..14e95fd
--- /dev/null
+++ b/src/router_core/connections.c
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+
+static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+ALLOC_DEFINE(qdr_connection_t);
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+qdr_connection_t *qdr_connection_opened(qdr_core_t *core, const char *label)
+{
+ qdr_action_t *action = qdr_action(qdr_connection_opened_CT);
+ qdr_connection_t *conn = new_qdr_connection_t();
+
+ conn->core = core;
+ conn->user_context = 0;
+ conn->label = label;
+
+ action->args.connection.conn = conn;
+ qdr_action_enqueue(core, action);
+
+ return conn;
+}
+
+
+void qdr_connection_closed(qdr_connection_t *conn)
+{
+ qdr_action_t *action = qdr_action(qdr_connection_closed_CT);
+ action->args.connection.conn = conn;
+ qdr_action_enqueue(conn->core, action);
+}
+
+
+void qdr_connection_set_context(qdr_connection_t *conn, void *context)
+{
+ if (conn)
+ conn->user_context = context;
+}
+
+
+void *qdr_connection_get_context(qdr_connection_t *conn)
+{
+ return conn ? conn->user_context : 0;
+}
+
+
+qdr_work_t *qdr_connection_work(qdr_connection_t *conn)
+{
+ return 0;
+}
+
+
+void qdr_connection_activate_handler(qdr_core_t *core, qdr_connection_activate_t handler, void *context)
+{
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ qdr_connection_t *conn = action->args.connection.conn;
+ DEQ_ITEM_INIT(conn);
+ DEQ_INSERT_TAIL(core->open_connections, conn);
+
+ //
+ // TODO - Look for waypoints that need to be activated now that their connection
+ // is open.
+ //
+
+ //
+ // TODO - Look for link-route destinations to be activated now that their connection
+ // is open.
+ //
+}
+
+
+static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+
+ qdr_connection_t *conn = action->args.connection.conn;
+
+ //
+ // TODO - Deactivate waypoints and link-route destinations for this connection
+ //
+
+ //
+ // TODO - Clean up links associated with this connection
+ //
+
+ DEQ_REMOVE(core->open_connections, conn);
+ free_qdr_connection_t(conn);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 11393a3..54ff4cf 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -63,6 +63,13 @@ struct qdr_action_t {
} route_table;
//
+ // Arguments for connection-level actions
+ //
+ struct {
+ qdr_connection_t *conn;
+ } connection;
+
+ //
// Arguments for in-process subscriptions
//
struct {
@@ -223,6 +230,17 @@ void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode);
+struct qdr_connection_t {
+ DEQ_LINKS(qdr_connection_t);
+ qdr_core_t *core;
+ void *user_context;
+ const char *label;
+};
+
+ALLOC_DECLARE(qdr_connection_t);
+DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t);
+
+
struct qdr_core_t {
qd_dispatch_t *qd;
qd_log_source_t *log;
@@ -232,6 +250,8 @@ struct qdr_core_t {
sys_cond_t *action_cond;
sys_mutex_t *action_lock;
+ qdr_connection_list_t open_connections;
+
//
// Agent section
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/df2bf5de/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index a45e9bb..6779817 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -18,7 +18,6 @@
*/
#include <qpid/dispatch/python_embedded.h>
-#include <qpid/dispatch/router_core.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
@@ -1918,6 +1917,12 @@ const char *qd_router_id(const qd_dispatch_t *qd)
}
+qdr_core_t *qd_router_core(qd_dispatch_t *qd)
+{
+ return qd->router->router_core;
+}
+
+
qd_address_t *qd_router_register_address(qd_dispatch_t *qd,
const char *address,
qd_router_message_cb_t on_message,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org