You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/10 20:32:51 UTC
[1/2] qpid-dispatch git commit: DISPATCH-1141 - Added core events API
and the general implementation
Repository: qpid-dispatch
Updated Branches:
refs/heads/master d1d6e92df -> e06af9220
DISPATCH-1141 - Added core events API and the general implementation
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ffd6a109
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ffd6a109
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ffd6a109
Branch: refs/heads/master
Commit: ffd6a109b2f17e1a7713c11398e0487110b03217
Parents: d1d6e92
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 10 13:03:44 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 10 13:03:44 2018 -0400
----------------------------------------------------------------------
src/CMakeLists.txt | 1 +
src/router_core/core_events.c | 118 +++++++++++++++++++++++
src/router_core/core_events.h | 144 +++++++++++++++++++++++++++++
src/router_core/router_core_private.h | 8 ++
4 files changed, 271 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f521fc5..c9c9f2a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -86,6 +86,7 @@ set(qpid_dispatch_SOURCES
router_core/agent_link.c
router_core/agent_router.c
router_core/connections.c
+ router_core/core_events.c
router_core/core_link_endpoint.c
router_core/edge_control.c
router_core/error.c
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/core_events.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_events.c b/src/router_core/core_events.c
new file mode 100644
index 0000000..a7216e5
--- /dev/null
+++ b/src/router_core/core_events.c
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "core_events.h"
+
+
+struct qdrc_event_subscription_t {
+ DEQ_LINKS_N(CONN, qdrc_event_subscription_t);
+ DEQ_LINKS_N(LINK, qdrc_event_subscription_t);
+ DEQ_LINKS_N(ADDR, qdrc_event_subscription_t);
+ void *context;
+ qdrc_event_t events;
+ qdrc_connection_event_t on_conn_event;
+ qdrc_link_event_t on_link_event;
+ qdrc_address_event_t on_addr_event;
+};
+
+
+qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t *core,
+ qdrc_event_t events,
+ qdrc_connection_event_t on_conn_event,
+ qdrc_link_event_t on_link_event,
+ qdrc_address_event_t on_addr_event,
+ void *context)
+{
+ qdrc_event_subscription_t *sub = NEW(qdrc_event_subscription_t);
+ ZERO(sub);
+
+ sub->context = context;
+ sub->events = events;
+ sub->on_conn_event = on_conn_event;
+ sub->on_link_event = on_link_event;
+ sub->on_addr_event = on_addr_event;
+
+ assert((events & !(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE)) == 0);
+ assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event);
+ assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event);
+ assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event);
+
+ if (events & _QDRC_EVENT_CONN_RANGE)
+ DEQ_INSERT_TAIL_N(CONN, core->conn_event_subscriptions, sub);
+
+ if (events & _QDRC_EVENT_LINK_RANGE)
+ DEQ_INSERT_TAIL_N(LINK, core->link_event_subscriptions, sub);
+
+ if (events & _QDRC_EVENT_ADDR_RANGE)
+ DEQ_INSERT_TAIL_N(ADDR, core->addr_event_subscriptions, sub);
+
+ return sub;
+}
+
+
+void qdrc_event_unsubscribe_CT(qdr_core_t *core, qdrc_event_subscription_t *sub)
+{
+ if (sub->events & _QDRC_EVENT_CONN_RANGE)
+ DEQ_REMOVE_N(CONN, core->conn_event_subscriptions, sub);
+
+ if (sub->events & _QDRC_EVENT_LINK_RANGE)
+ DEQ_REMOVE_N(LINK, core->link_event_subscriptions, sub);
+
+ if (sub->events & _QDRC_EVENT_ADDR_RANGE)
+ DEQ_REMOVE_N(ADDR, core->addr_event_subscriptions, sub);
+
+ free(sub);
+}
+
+
+void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn)
+{
+ qdrc_event_subscription_t *sub = DEQ_HEAD(core->conn_event_subscriptions);
+
+ while (sub) {
+ if (sub->events & event)
+ sub->on_conn_event(sub->context, event, conn);
+ sub = DEQ_NEXT_N(CONN, sub);
+ }
+}
+
+
+void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link)
+{
+ qdrc_event_subscription_t *sub = DEQ_HEAD(core->link_event_subscriptions);
+
+ while (sub) {
+ if (sub->events & event)
+ sub->on_link_event(sub->context, event, link);
+ sub = DEQ_NEXT_N(LINK, sub);
+ }
+}
+
+
+void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr)
+{
+ qdrc_event_subscription_t *sub = DEQ_HEAD(core->addr_event_subscriptions);
+
+ while (sub) {
+ if (sub->events & event)
+ sub->on_addr_event(sub->context, event, addr);
+ sub = DEQ_NEXT_N(ADDR, sub);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/core_events.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_events.h b/src/router_core/core_events.h
new file mode 100644
index 0000000..04c50e5
--- /dev/null
+++ b/src/router_core/core_events.h
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_event_types
+#define qd_router_core_event_types 1
+
+typedef struct qdrc_event_subscription_t qdrc_event_subscription_t;
+
+#include "router_core_private.h"
+
+#endif
+
+#ifndef qd_router_core_events
+#define qd_router_core_events 1
+
+#include "qpid/dispatch/ctools.h"
+
+typedef uint32_t qdrc_event_t;
+
+/**
+ * QDRC_EVENT_CONN_OPENED A connection has opened
+ * QDRC_EVENT_CONN_CLOSED A connection has closed
+ * QDRC_EVENT_CONN_EDGE_ESTABLISHED An edge connection has been established
+ * QDRC_EVENT_CONN_EDGE_LOST An edge connection has been lost
+ * QDRC_EVENT_CONN_IR_ESTABLISHED An inter-router connection has been established
+ * QDRC_EVENT_CONN_IR_LOST An inter-router connection has been lost
+ *
+ * QDRC_EVENT_LINK_IN_ATTACHED An inbound link has been attached
+ * QDRC_EVENT_LINK_IN_DETACHED An inbound link has been detached
+ * QDRC_EVENT_LINK_OUT_ATTACHED An outbound link has been attached
+ * QDRC_EVENT_LINK_OUT_DETACHED An outbound link has been detached
+ *
+ * QDRC_EVENT_ADDR_ADDED An address has been added
+ * QDRC_EVENT_ADDR_REMOVED An address has been removed
+ * QDRC_EVENT_ADDR_BECAME_LOCAL_DEST An address transitioned from zero to one local destination
+ * QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST An address transitioned from one to zero local destinations
+ * QDRC_EVENT_ADDR_BECAME_DEST An address transitioned from zero to one destination
+ * QDRC_EVENT_ADDR_NO_LONGER_DEST An address transitioned from one to zero destinations
+ * QDRC_EVENT_ADDR_ONE_LOCAL_DEST An address transitioned from N destinations to one local dest
+ * QDRC_EVENT_ADDR_TWO_DEST An address transisioned from one local dest to two destinations
+*/
+
+#define QDRC_EVENT_CONN_OPENED 0x00000001
+#define QDRC_EVENT_CONN_CLOSED 0x00000002
+#define QDRC_EVENT_CONN_EDGE_ESTABLISHED 0x00000004
+#define QDRC_EVENT_CONN_EDGE_LOST 0x00000008
+#define QDRC_EVENT_CONN_IR_ESTABLISHED 0x00000010
+#define QDRC_EVENT_CONN_IR_LOST 0x00000020
+#define _QDRC_EVENT_CONN_RANGE 0x0000003F
+
+#define QDRC_EVENT_LINK_IN_ATTACHED 0x00000100
+#define QDRC_EVENT_LINK_IN_DETACHED 0x00000200
+#define QDRC_EVENT_LINK_OUT_ATTACHED 0x00000400
+#define QDRC_EVENT_LINK_OUT_DETACHED 0x00000800
+#define _QDRC_EVENT_LINK_RANGE 0x00000F00
+
+#define QDRC_EVENT_ADDR_ADDED 0x00010000
+#define QDRC_EVENT_ADDR_REMOVED 0x00020000
+#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST 0x00040000
+#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00080000
+#define QDRC_EVENT_ADDR_BECAME_DEST 0x00100000
+#define QDRC_EVENT_ADDR_NO_LONGER_DEST 0x00200000
+#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST 0x00400000
+#define QDRC_EVENT_ADDR_TWO_DEST 0x00800000
+#define _QDRC_EVENT_ADDR_RANGE 0x00FF0000
+
+
+/**
+ * Callbacks - Connection, Link, and Address event notifications
+ *
+ * @param context The opaque context provided in the mobile address binding
+ * @param event_type The type of event being raised
+ * @param conn/link/addr The object involved in the event
+ */
+typedef void (*qdrc_connection_event_t) (void *context,
+ qdrc_event_t event_type,
+ qdr_connection_t *conn);
+
+typedef void (*qdrc_link_event_t) (void *context,
+ qdrc_event_t event_type,
+ qdr_link_t *link);
+
+typedef void (*qdrc_address_event_t) (void *context,
+ qdrc_event_t event_type,
+ qdr_address_t *addr);
+
+/**
+ * qdrc_event_subscribe_CT
+ *
+ * Subscribe to receive a set of event types for connections, links, or addresses.
+ *
+ * @param core Pointer to the router core object
+ * @param events Logical OR of the set of events that the caller wishes to receive
+ * @param on_conn_event Callback for connection events, must be non-null if there are connection events in 'events'
+ * @param on_link_event Callback for link events, must be non-null if there are link events in 'events'
+ * @param on_addr_event Callback for address events, must be non-null if there are address events in 'events'
+ * @param context The opaque context that will be passed back with the raised events
+ * @return Pointer to an object that tracks the subscription
+ */
+qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t *core,
+ qdrc_event_t events,
+ qdrc_connection_event_t on_conn_event,
+ qdrc_link_event_t on_link_event,
+ qdrc_address_event_t on_addr_event,
+ void *context);
+
+/**
+ * qdrc_event_unsubscribe_CT
+ *
+ * Cancel an active subscription
+ *
+ * @param core Pointer to the router core object
+ * @param sub Pointer to the subscription returned by the subscribe function
+ */
+void qdrc_event_unsubscribe_CT(qdr_core_t *core, qdrc_event_subscription_t *sub);
+
+
+//=====================================================================================
+// Private functions, not part of the API
+//=====================================================================================
+
+DEQ_DECLARE(qdrc_event_subscription_t, qdrc_event_subscription_list_t);
+
+void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn);
+void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link);
+void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr);
+
+#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 351f2a5..45cda63 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -41,6 +41,7 @@ typedef struct qdr_exchange qdr_exchange_t;
typedef struct qdr_edge_t qdr_edge_t;
#include "core_link_endpoint.h"
+#include "core_events.h"
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -741,6 +742,13 @@ struct qdr_core_t {
qdr_link_deliver_t deliver_handler;
qdr_delivery_update_t delivery_update_handler;
+ //
+ // Events section
+ //
+ qdrc_event_subscription_list_t conn_event_subscriptions;
+ qdrc_event_subscription_list_t link_event_subscriptions;
+ qdrc_event_subscription_list_t addr_event_subscriptions;
+
qd_router_mode_t router_mode;
const char *router_area;
const char *router_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-1141 - Wired in connection
open and close events
Posted by tr...@apache.org.
DISPATCH-1141 - Wired in connection open and close events
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e06af922
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e06af922
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e06af922
Branch: refs/heads/master
Commit: e06af9220518a91d9172590cbb763ee95553538e
Parents: ffd6a10
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 10 13:04:45 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 10 13:04:45 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 104 +++++++++++++++++++------------------
1 file changed, 54 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e06af922/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2eda2d6..5bb4921 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1264,70 +1264,72 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core,
static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
+
if (!discard) {
qdr_connection_t *conn = action->args.connection.conn;
- DEQ_ITEM_INIT(conn);
- DEQ_INSERT_TAIL(core->open_connections, conn);
- if (conn->role == QDR_ROLE_NORMAL) {
- //
- // No action needed for NORMAL connections
- //
- qdr_field_free(action->args.connection.connection_label);
- qdr_field_free(action->args.connection.container_id);
- return;
- }
+ do {
+ DEQ_ITEM_INIT(conn);
+ DEQ_INSERT_TAIL(core->open_connections, conn);
- if (conn->role == QDR_ROLE_INTER_ROUTER) {
- //
- // Assign a unique mask-bit to this connection as a reference to be used by
- // the router module
- //
- if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
- qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
- else {
- qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
- conn->role = QDR_ROLE_NORMAL;
- qdr_field_free(action->args.connection.connection_label);
- qdr_field_free(action->args.connection.container_id);
- return;
+ if (conn->role == QDR_ROLE_NORMAL) {
+ //
+ // No action needed for NORMAL connections
+ //
+ break;
}
- if (!conn->incoming) {
+ if (conn->role == QDR_ROLE_INTER_ROUTER) {
//
- // The connector-side of inter-router/edge-uplink connections is responsible for setting up the
- // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
+ // Assign a unique mask-bit to this connection as a reference to be used by
+ // the router module
//
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
- (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
+ if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
+ qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
+ else {
+ qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
+ conn->role = QDR_ROLE_NORMAL;
+ break;
+ }
+
+ if (!conn->incoming) {
+ //
+ // The connector-side of inter-router/edge-uplink connections is responsible for setting up the
+ // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
+ //
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
+ (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
- (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+ }
}
}
- }
- if (conn->role == QDR_ROLE_ROUTE_CONTAINER) {
- //
- // Notify the route-control module that a route-container connection has opened.
- // There may be routes that need to be activated due to the opening of this connection.
- //
+ if (conn->role == QDR_ROLE_ROUTE_CONTAINER) {
+ //
+ // Notify the route-control module that a route-container connection has opened.
+ // There may be routes that need to be activated due to the opening of this connection.
+ //
- //
- // If there's a connection label, use it as the identifier. Otherwise, use the remote
- // container id.
- //
- qdr_field_t *cid = action->args.connection.connection_label ?
- action->args.connection.connection_label : action->args.connection.container_id;
- if (cid)
- qdr_route_connection_opened_CT(core, conn, action->args.connection.container_id, action->args.connection.connection_label);
- }
+ //
+ // If there's a connection label, use it as the identifier. Otherwise, use the remote
+ // container id.
+ //
+ qdr_field_t *cid = action->args.connection.connection_label ?
+ action->args.connection.connection_label : action->args.connection.container_id;
+ if (cid)
+ qdr_route_connection_opened_CT(core, conn, action->args.connection.container_id, action->args.connection.connection_label);
+ }
- if (conn->role == QDR_ROLE_EDGE_UPLINK && core->edge) {
- // edge router established connection to interior
- qdr_edge_connection_opened(core->edge, conn);
- }
+ if (conn->role == QDR_ROLE_EDGE_UPLINK && core->edge) {
+ // edge router established connection to interior
+ qdr_edge_connection_opened(core->edge, conn);
+ }
+ } while (false);
+
+ qdrc_event_conn_raise(core, QDRC_EVENT_CONN_OPENED, conn);
}
qdr_field_free(action->args.connection.connection_label);
@@ -1417,6 +1419,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
qdr_edge_connection_closed(core->edge);
}
+ qdrc_event_conn_raise(core, QDRC_EVENT_CONN_CLOSED, conn);
+
DEQ_REMOVE(core->open_connections, conn);
qdr_connection_free(conn);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org