You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/10/10 20:32:51 UTC

[1/2] qpid-dispatch git commit: DISPATCH-1141 - Added core events API and the general implementation

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master d1d6e92df -> e06af9220


DISPATCH-1141 - Added core events API and the general implementation


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ffd6a109
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ffd6a109
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ffd6a109

Branch: refs/heads/master
Commit: ffd6a109b2f17e1a7713c11398e0487110b03217
Parents: d1d6e92
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 10 13:03:44 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 10 13:03:44 2018 -0400

----------------------------------------------------------------------
 src/CMakeLists.txt                    |   1 +
 src/router_core/core_events.c         | 118 +++++++++++++++++++++++
 src/router_core/core_events.h         | 144 +++++++++++++++++++++++++++++
 src/router_core/router_core_private.h |   8 ++
 4 files changed, 271 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f521fc5..c9c9f2a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -86,6 +86,7 @@ set(qpid_dispatch_SOURCES
   router_core/agent_link.c
   router_core/agent_router.c
   router_core/connections.c
+  router_core/core_events.c
   router_core/core_link_endpoint.c
   router_core/edge_control.c
   router_core/error.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/core_events.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_events.c b/src/router_core/core_events.c
new file mode 100644
index 0000000..a7216e5
--- /dev/null
+++ b/src/router_core/core_events.c
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "core_events.h"
+
+
+struct qdrc_event_subscription_t {
+    DEQ_LINKS_N(CONN, qdrc_event_subscription_t);
+    DEQ_LINKS_N(LINK, qdrc_event_subscription_t);
+    DEQ_LINKS_N(ADDR, qdrc_event_subscription_t);
+    void                    *context;
+    qdrc_event_t             events;
+    qdrc_connection_event_t  on_conn_event;
+    qdrc_link_event_t        on_link_event;
+    qdrc_address_event_t     on_addr_event;
+};
+
+
+qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t             *core,
+                                                   qdrc_event_t            events,
+                                                   qdrc_connection_event_t on_conn_event,
+                                                   qdrc_link_event_t       on_link_event,
+                                                   qdrc_address_event_t    on_addr_event,
+                                                   void                   *context)
+{
+    qdrc_event_subscription_t *sub = NEW(qdrc_event_subscription_t);
+    ZERO(sub);
+
+    sub->context       = context;
+    sub->events        = events;
+    sub->on_conn_event = on_conn_event;
+    sub->on_link_event = on_link_event;
+    sub->on_addr_event = on_addr_event;
+
+    assert((events & !(_QDRC_EVENT_CONN_RANGE | _QDRC_EVENT_LINK_RANGE | _QDRC_EVENT_ADDR_RANGE)) == 0);
+    assert(!(events & _QDRC_EVENT_CONN_RANGE) || on_conn_event);
+    assert(!(events & _QDRC_EVENT_LINK_RANGE) || on_link_event);
+    assert(!(events & _QDRC_EVENT_ADDR_RANGE) || on_addr_event);
+
+    if (events & _QDRC_EVENT_CONN_RANGE)
+        DEQ_INSERT_TAIL_N(CONN, core->conn_event_subscriptions, sub);
+
+    if (events & _QDRC_EVENT_LINK_RANGE)
+        DEQ_INSERT_TAIL_N(LINK, core->link_event_subscriptions, sub);
+
+    if (events & _QDRC_EVENT_ADDR_RANGE)
+        DEQ_INSERT_TAIL_N(ADDR, core->addr_event_subscriptions, sub);
+
+    return sub;
+}
+
+
+void qdrc_event_unsubscribe_CT(qdr_core_t *core, qdrc_event_subscription_t *sub)
+{
+    if (sub->events & _QDRC_EVENT_CONN_RANGE)
+        DEQ_REMOVE_N(CONN, core->conn_event_subscriptions, sub);
+
+    if (sub->events & _QDRC_EVENT_LINK_RANGE)
+        DEQ_REMOVE_N(LINK, core->link_event_subscriptions, sub);
+
+    if (sub->events & _QDRC_EVENT_ADDR_RANGE)
+        DEQ_REMOVE_N(ADDR, core->addr_event_subscriptions, sub);
+
+    free(sub);
+}
+
+
+void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn)
+{
+    qdrc_event_subscription_t *sub = DEQ_HEAD(core->conn_event_subscriptions);
+
+    while (sub) {
+        if (sub->events & event)
+            sub->on_conn_event(sub->context, event, conn);
+        sub = DEQ_NEXT_N(CONN, sub);
+    }
+}
+
+
+void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link)
+{
+    qdrc_event_subscription_t *sub = DEQ_HEAD(core->link_event_subscriptions);
+
+    while (sub) {
+        if (sub->events & event)
+            sub->on_link_event(sub->context, event, link);
+        sub = DEQ_NEXT_N(LINK, sub);
+    }
+}
+
+
+void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr)
+{
+    qdrc_event_subscription_t *sub = DEQ_HEAD(core->addr_event_subscriptions);
+
+    while (sub) {
+        if (sub->events & event)
+            sub->on_addr_event(sub->context, event, addr);
+        sub = DEQ_NEXT_N(ADDR, sub);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/core_events.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_events.h b/src/router_core/core_events.h
new file mode 100644
index 0000000..04c50e5
--- /dev/null
+++ b/src/router_core/core_events.h
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef qd_router_core_event_types
+#define qd_router_core_event_types 1
+
+typedef struct qdrc_event_subscription_t qdrc_event_subscription_t;
+
+#include "router_core_private.h"
+
+#endif
+
+#ifndef qd_router_core_events
+#define qd_router_core_events 1
+
+#include "qpid/dispatch/ctools.h"
+
+typedef uint32_t qdrc_event_t;
+
+/**
+ * QDRC_EVENT_CONN_OPENED                A connection has opened
+ * QDRC_EVENT_CONN_CLOSED                A connection has closed
+ * QDRC_EVENT_CONN_EDGE_ESTABLISHED      An edge connection has been established
+ * QDRC_EVENT_CONN_EDGE_LOST             An edge connection has been lost
+ * QDRC_EVENT_CONN_IR_ESTABLISHED        An inter-router connection has been established
+ * QDRC_EVENT_CONN_IR_LOST               An inter-router connection has been lost
+ *
+ * QDRC_EVENT_LINK_IN_ATTACHED           An inbound link has been attached
+ * QDRC_EVENT_LINK_IN_DETACHED           An inbound link has been detached
+ * QDRC_EVENT_LINK_OUT_ATTACHED          An outbound link has been attached
+ * QDRC_EVENT_LINK_OUT_DETACHED          An outbound link has been detached
+ *
+ * QDRC_EVENT_ADDR_ADDED                 An address has been added
+ * QDRC_EVENT_ADDR_REMOVED               An address has been removed
+ * QDRC_EVENT_ADDR_BECAME_LOCAL_DEST     An address transitioned from zero to one local destination
+ * QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST  An address transitioned from one to zero local destinations
+ * QDRC_EVENT_ADDR_BECAME_DEST           An address transitioned from zero to one destination
+ * QDRC_EVENT_ADDR_NO_LONGER_DEST        An address transitioned from one to zero destinations
+ * QDRC_EVENT_ADDR_ONE_LOCAL_DEST        An address transitioned from N destinations to one local dest
+ * QDRC_EVENT_ADDR_TWO_DEST              An address transisioned from one local dest to two destinations
+*/
+
+#define QDRC_EVENT_CONN_OPENED               0x00000001
+#define QDRC_EVENT_CONN_CLOSED               0x00000002
+#define QDRC_EVENT_CONN_EDGE_ESTABLISHED     0x00000004
+#define QDRC_EVENT_CONN_EDGE_LOST            0x00000008
+#define QDRC_EVENT_CONN_IR_ESTABLISHED       0x00000010
+#define QDRC_EVENT_CONN_IR_LOST              0x00000020
+#define _QDRC_EVENT_CONN_RANGE               0x0000003F
+
+#define QDRC_EVENT_LINK_IN_ATTACHED          0x00000100
+#define QDRC_EVENT_LINK_IN_DETACHED          0x00000200
+#define QDRC_EVENT_LINK_OUT_ATTACHED         0x00000400
+#define QDRC_EVENT_LINK_OUT_DETACHED         0x00000800
+#define _QDRC_EVENT_LINK_RANGE               0x00000F00
+
+#define QDRC_EVENT_ADDR_ADDED                0x00010000
+#define QDRC_EVENT_ADDR_REMOVED              0x00020000
+#define QDRC_EVENT_ADDR_BECAME_LOCAL_DEST    0x00040000
+#define QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST 0x00080000
+#define QDRC_EVENT_ADDR_BECAME_DEST          0x00100000
+#define QDRC_EVENT_ADDR_NO_LONGER_DEST       0x00200000
+#define QDRC_EVENT_ADDR_ONE_LOCAL_DEST       0x00400000
+#define QDRC_EVENT_ADDR_TWO_DEST             0x00800000
+#define _QDRC_EVENT_ADDR_RANGE               0x00FF0000
+
+
+/**
+ * Callbacks - Connection, Link, and Address event notifications
+ *
+ * @param context The opaque context provided in the mobile address binding
+ * @param event_type The type of event being raised
+ * @param conn/link/addr The object involved in the event
+ */
+typedef void (*qdrc_connection_event_t) (void             *context,
+                                         qdrc_event_t      event_type,
+                                         qdr_connection_t *conn);
+
+typedef void (*qdrc_link_event_t) (void         *context,
+                                   qdrc_event_t  event_type,
+                                   qdr_link_t   *link);
+
+typedef void (*qdrc_address_event_t) (void          *context,
+                                      qdrc_event_t   event_type,
+                                      qdr_address_t *addr);
+
+/**
+ * qdrc_event_subscribe_CT
+ *
+ * Subscribe to receive a set of event types for connections, links, or addresses.
+ *
+ * @param core Pointer to the router core object
+ * @param events Logical OR of the set of events that the caller wishes to receive
+ * @param on_conn_event Callback for connection events, must be non-null if there are connection events in 'events'
+ * @param on_link_event Callback for link events, must be non-null if there are link events in 'events'
+ * @param on_addr_event Callback for address events, must be non-null if there are address events in 'events'
+ * @param context The opaque context that will be passed back with the raised events
+ * @return Pointer to an object that tracks the subscription
+ */
+qdrc_event_subscription_t *qdrc_event_subscribe_CT(qdr_core_t             *core,
+                                                   qdrc_event_t            events,
+                                                   qdrc_connection_event_t on_conn_event,
+                                                   qdrc_link_event_t       on_link_event,
+                                                   qdrc_address_event_t    on_addr_event,
+                                                   void                   *context);
+
+/**
+ * qdrc_event_unsubscribe_CT
+ *
+ * Cancel an active subscription
+ *
+ * @param core Pointer to the router core object
+ * @param sub Pointer to the subscription returned by the subscribe function
+ */
+void qdrc_event_unsubscribe_CT(qdr_core_t *core, qdrc_event_subscription_t *sub);
+
+
+//=====================================================================================
+// Private functions, not part of the API
+//=====================================================================================
+
+DEQ_DECLARE(qdrc_event_subscription_t, qdrc_event_subscription_list_t);
+
+void qdrc_event_conn_raise(qdr_core_t *core, qdrc_event_t event, qdr_connection_t *conn);
+void qdrc_event_link_raise(qdr_core_t *core, qdrc_event_t event, qdr_link_t *link);
+void qdrc_event_addr_raise(qdr_core_t *core, qdrc_event_t event, qdr_address_t *addr);
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ffd6a109/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 351f2a5..45cda63 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -41,6 +41,7 @@ typedef struct qdr_exchange          qdr_exchange_t;
 typedef struct qdr_edge_t            qdr_edge_t;
 
 #include "core_link_endpoint.h"
+#include "core_events.h"
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_treatment_t treatment);
 int qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, qdr_delivery_t *in_delivery,
@@ -741,6 +742,13 @@ struct qdr_core_t {
     qdr_link_deliver_t         deliver_handler;
     qdr_delivery_update_t      delivery_update_handler;
 
+    //
+    // Events section
+    //
+    qdrc_event_subscription_list_t conn_event_subscriptions;
+    qdrc_event_subscription_list_t link_event_subscriptions;
+    qdrc_event_subscription_list_t addr_event_subscriptions;
+
     qd_router_mode_t  router_mode;
     const char       *router_area;
     const char       *router_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-1141 - Wired in connection open and close events

Posted by tr...@apache.org.
DISPATCH-1141 - Wired in connection open and close events


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e06af922
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e06af922
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e06af922

Branch: refs/heads/master
Commit: e06af9220518a91d9172590cbb763ee95553538e
Parents: ffd6a10
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 10 13:04:45 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 10 13:04:45 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c | 104 +++++++++++++++++++------------------
 1 file changed, 54 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e06af922/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2eda2d6..5bb4921 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1264,70 +1264,72 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t       *core,
 
 static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
+
     if (!discard) {
         qdr_connection_t *conn = action->args.connection.conn;
-        DEQ_ITEM_INIT(conn);
-        DEQ_INSERT_TAIL(core->open_connections, conn);
 
-        if (conn->role == QDR_ROLE_NORMAL) {
-            //
-            // No action needed for NORMAL connections
-            //
-            qdr_field_free(action->args.connection.connection_label);
-            qdr_field_free(action->args.connection.container_id);
-            return;
-        }
+        do {
+            DEQ_ITEM_INIT(conn);
+            DEQ_INSERT_TAIL(core->open_connections, conn);
 
-        if (conn->role == QDR_ROLE_INTER_ROUTER) {
-            //
-            // Assign a unique mask-bit to this connection as a reference to be used by
-            // the router module
-            //
-            if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
-                qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
-            else {
-                qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
-                conn->role = QDR_ROLE_NORMAL;
-                qdr_field_free(action->args.connection.connection_label);
-                qdr_field_free(action->args.connection.container_id);
-                return;
+            if (conn->role == QDR_ROLE_NORMAL) {
+                //
+                // No action needed for NORMAL connections
+                //
+                break;
             }
 
-            if (!conn->incoming) {
+            if (conn->role == QDR_ROLE_INTER_ROUTER) {
                 //
-                // The connector-side of inter-router/edge-uplink connections is responsible for setting up the
-                // inter-router links:  Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
+                // Assign a unique mask-bit to this connection as a reference to be used by
+                // the router module
                 //
-                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
-                (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
+                if (qd_bitmask_first_set(core->neighbor_free_mask, &conn->mask_bit))
+                    qd_bitmask_clear_bit(core->neighbor_free_mask, conn->mask_bit);
+                else {
+                    qd_log(core->log, QD_LOG_CRITICAL, "Exceeded maximum inter-router connection count");
+                    conn->role = QDR_ROLE_NORMAL;
+                    break;
+                }
+
+                if (!conn->incoming) {
+                    //
+                    // The connector-side of inter-router/edge-uplink connections is responsible for setting up the
+                    // inter-router links:  Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer.
+                    //
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control());
+                    (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control());
 
-                for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
-                    (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                        (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER,  QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data());
+                    }
                 }
             }
-        }
 
-        if (conn->role == QDR_ROLE_ROUTE_CONTAINER) {
-            //
-            // Notify the route-control module that a route-container connection has opened.
-            // There may be routes that need to be activated due to the opening of this connection.
-            //
+            if (conn->role == QDR_ROLE_ROUTE_CONTAINER) {
+                //
+                // Notify the route-control module that a route-container connection has opened.
+                // There may be routes that need to be activated due to the opening of this connection.
+                //
 
-            //
-            // If there's a connection label, use it as the identifier.  Otherwise, use the remote
-            // container id.
-            //
-            qdr_field_t *cid = action->args.connection.connection_label ?
-                action->args.connection.connection_label : action->args.connection.container_id;
-            if (cid)
-                qdr_route_connection_opened_CT(core, conn, action->args.connection.container_id, action->args.connection.connection_label);
-        }
+                //
+                // If there's a connection label, use it as the identifier.  Otherwise, use the remote
+                // container id.
+                //
+                qdr_field_t *cid = action->args.connection.connection_label ?
+                    action->args.connection.connection_label : action->args.connection.container_id;
+                if (cid)
+                    qdr_route_connection_opened_CT(core, conn, action->args.connection.container_id, action->args.connection.connection_label);
+            }
 
-        if (conn->role == QDR_ROLE_EDGE_UPLINK && core->edge) {
-            // edge router established connection to interior
-            qdr_edge_connection_opened(core->edge, conn);
-        }
+            if (conn->role == QDR_ROLE_EDGE_UPLINK && core->edge) {
+                // edge router established connection to interior
+                qdr_edge_connection_opened(core->edge, conn);
+            }
+        } while (false);
+
+        qdrc_event_conn_raise(core, QDRC_EVENT_CONN_OPENED, conn);
     }
 
     qdr_field_free(action->args.connection.connection_label);
@@ -1417,6 +1419,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
         qdr_edge_connection_closed(core->edge);
     }
 
+    qdrc_event_conn_raise(core, QDRC_EVENT_CONN_CLOSED, conn);
+
     DEQ_REMOVE(core->open_connections, conn);
     qdr_connection_free(conn);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org