You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/19 00:06:27 UTC

[09/50] [abbrv] qpid-dispatch git commit: DISPATCH-179 - WIP for the route-control module.

DISPATCH-179 - WIP for the route-control module.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f5f10ba8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f5f10ba8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f5f10ba8

Branch: refs/heads/master
Commit: f5f10ba8cf787cb5722b0be26cb32faef90d84b4
Parents: e9da4b7
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Mar 3 13:01:29 2016 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Mar 3 13:01:29 2016 -0500

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json |   6 +
 src/connection_manager.c                      |   9 +-
 src/router_core/agent_route.c                 |   4 +-
 src/router_core/connections.c                 |  31 +++--
 src/router_core/route_control.c               | 127 ++++++++++++++++++++-
 src/router_core/route_control.h               |  16 ++-
 src/router_core/router_core_private.h         |   2 +-
 tests/field_test.c                            |   7 +-
 8 files changed, 173 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 83d9d52..14a087a 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -37,6 +37,12 @@
                     "default": "normal",
                     "description": "The role of an established connection. In the normal role, the connection is assumed to be used for AMQP clients that are doing normal message delivery over the connection.  In the inter-router role, the connection is assumed to be to another router in the network.  Inter-router discovery and routing protocols can only be used over inter-router connections.",
                     "create": true
+                },
+                "label": {
+                    "type": "string",
+                    "create": true,
+                    "required": false,
+                    "description": "When the role is 'route-container', this optional label may be used to identify connections for use in routes."
                 }
             }
         },

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index d432f64..31e2fad 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -141,7 +141,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
     memset(config, 0, sizeof(*config));
     config->host            = qd_entity_get_string(entity, "addr"); CHECK();
     config->port            = qd_entity_get_string(entity, "port"); CHECK();
-    config->label           = qd_entity_get_string(entity, "name"); CHECK();
+    config->label           = qd_entity_opt_string(entity, "label", 0); CHECK();
     config->role            = qd_entity_get_string(entity, "role"); CHECK();
     config->max_frame_size  = qd_entity_get_long(entity, "maxFrameSize"); CHECK();
     config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK();
@@ -207,10 +207,11 @@ qd_error_t qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entit
     if (load_server_config(qd, &cc->configuration, entity))
         return qd_error_code();
     DEQ_ITEM_INIT(cc);
-    if (strcmp(cc->configuration.role, "on-demand") == 0) {
+    if (strcmp(cc->configuration.role, "route-container") == 0) {
         DEQ_INSERT_TAIL(cm->on_demand_connectors, cc);
-        qd_log(cm->log_source, QD_LOG_INFO, "Configured on-demand connector: %s:%s label=%s",
-               cc->configuration.host, cc->configuration.port, cc->configuration.label);
+        qd_log(cm->log_source, QD_LOG_INFO, "Configured route-container connector: %s:%s label=%s",
+               cc->configuration.host, cc->configuration.port,
+               cc->configuration.label ? cc->configuration.label : "<none>");
     } else {
         DEQ_INSERT_TAIL(cm->config_connectors, cc);
         qd_log(cm->log_source, QD_LOG_INFO, "Configured Connector: %s:%s role=%s",

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/router_core/agent_route.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_route.c b/src/router_core/agent_route.c
index 7024a86..acf397b 100644
--- a/src/router_core/agent_route.c
+++ b/src/router_core/agent_route.c
@@ -282,7 +282,7 @@ void qdra_route_create_CT(qdr_core_t *core, qd_field_iterator_t *name,
             uint32_t count = qd_parse_sub_count(conn_field);
             for (uint32_t i = 0; i < count; i++) {
                 qd_parsed_field_t *conn_label = qd_parse_sub_value(conn_field, i);
-                qdr_route_connection_add_CT(route, conn_label, false);
+                qdr_route_connection_add_CT(core, route, conn_label, false);
             }
         }
 
@@ -293,7 +293,7 @@ void qdra_route_create_CT(qdr_core_t *core, qd_field_iterator_t *name,
             uint32_t count = qd_parse_sub_count(cont_field);
             for (uint32_t i = 0; i < count; i++) {
                 qd_parsed_field_t *cont_id = qd_parse_sub_value(cont_field, i);
-                qdr_route_connection_add_CT(route, cont_id, true);
+                qdr_route_connection_add_CT(core, route, cont_id, true);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 58606cf..923114f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -18,6 +18,7 @@
  */
 
 #include "router_core_private.h"
+#include "route_control.h"
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 
@@ -84,9 +85,9 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
     DEQ_INIT(conn->work_list);
     conn->work_lock = sys_mutex();
 
-    action->args.connection.conn         = conn;
-    action->args.connection.label        = label;
-    action->args.connection.container_id = qdr_field(remote_container_id);
+    action->args.connection.conn             = conn;
+    action->args.connection.connection_label = qdr_field(label);
+    action->args.connection.container_id     = qdr_field(remote_container_id);
     qdr_action_enqueue(core, action);
 
     return conn;
@@ -725,13 +726,24 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo
             }
         }
 
-        //
-        // If the role is ON_DEMAND:
-        //    Activate waypoints associated with this connection
-        //    Activate link-route destinations associated with this connection
-        //
+        if (conn->role == QDR_ROLE_ROUTE_CONTAINER) {
+            //
+            // Notify the route-control module that a route-container connection has opened.
+            // There may be routes that need to be activated due to the opening of this connection.
+            //
+
+            //
+            // If there's a connection label, use it as the identifier.  Otherwise, use the remote
+            // container id.
+            //
+            qdr_field_t *cid = action->args.connection.connection_label ?
+                action->args.connection.connection_label : action->args.connection.container_id;
+            if (cid)
+                qdr_route_connection_opened_CT(core, conn, cid, action->args.connection.connection_label == 0);
+        }
     }
 
+    qdr_field_free(action->args.connection.connection_label);
     qdr_field_free(action->args.connection.container_id);
 }
 
@@ -744,8 +756,9 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     qdr_connection_t *conn = action->args.connection.conn;
 
     //
-    // TODO - Deactivate waypoints and link-route destinations for this connection
+    // Deactivate routes associated with this connection
     //
+    qdr_route_connection_closed_CT(core, conn);
 
     //
     // TODO - Clean up links associated with this connection

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index b75f22ee..b72c535 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -18,6 +18,7 @@
  */
 
 #include "route_control.h"
+#include <stdio.h>
 
 ALLOC_DEFINE(qdr_route_active_t);
 ALLOC_DEFINE(qdr_route_config_t);
@@ -98,6 +99,49 @@ static void qdr_route_free_CT(qdr_core_t *core, qdr_route_config_t *route)
 }
 
 
+static qdr_conn_identifier_t *qdr_route_declare_id_CT(qdr_core_t          *core,
+                                                      qd_field_iterator_t *conn_id,
+                                                      bool                 is_container)
+{
+    char                   prefix = is_container ? 'C' : 'L';
+    qdr_conn_identifier_t *cid    = 0;
+
+    qd_address_iterator_reset_view(conn_id, ITER_VIEW_ADDRESS_HASH);
+    qd_address_iterator_override_prefix(conn_id, prefix);
+
+    qd_hash_retrieve(core->conn_id_hash, conn_id, (void**) &cid);
+    if (!cid) {
+        cid = new_qdr_conn_identifier_t();
+        ZERO(cid);
+        qd_hash_insert(core->conn_id_hash, conn_id, cid, &cid->hash_handle);
+    }
+
+    return cid;
+}
+
+static void qdr_route_check_id_for_deletion_CT(qdr_core_t *core, qdr_conn_identifier_t *cid)
+{
+    //
+    // If this connection identifier has no open connection and no referencing routes,
+    // it can safely be deleted and removed from the hash index.
+    //
+    if (cid->open_connection == 0 && DEQ_IS_EMPTY(cid->active_refs)) {
+        qd_hash_remove_by_handle(core->conn_id_hash, cid->hash_handle);
+        free_qdr_conn_identifier_t(cid);
+    }
+}
+
+
+static void qdr_route_activate_CT(qdr_core_t *core, qdr_route_active_t *active)
+{
+}
+
+
+static void qdr_route_deactivate_CT(qdr_core_t *core, qdr_route_active_t *active)
+{
+}
+
+
 const char *qdr_route_create_CT(qdr_core_t             *core,
                                 qd_field_iterator_t    *name,
                                 qdr_route_path_t        path,
@@ -151,38 +195,111 @@ const char *qdr_route_create_CT(qdr_core_t             *core,
 }
 
 
-void qdr_route_delete_CT(qdr_route_config_t *route)
+void qdr_route_delete_CT(qdr_core_t *core, qdr_route_config_t *route)
 {
 }
 
 
-void qdr_route_connection_add_CT(qdr_route_config_t *route,
+void qdr_route_connection_add_CT(qdr_core_t         *core,
+                                 qdr_route_config_t *route,
                                  qd_parsed_field_t  *conn_id,
                                  bool                is_container)
 {
+    //
+    // Create a new active record for this route+connection and get a connection identifier
+    // record (find and existing one or create a new one).
+    //
+    qdr_route_active_t    *active = new_qdr_route_active_t();
+    qdr_conn_identifier_t *cid    = qdr_route_declare_id_CT(core, qd_parse_raw(conn_id), is_container);
+
+    //
+    // Initialize the active record in the DOWN state.
+    //
+    DEQ_ITEM_INIT(active);
+    DEQ_ITEM_INIT_N(REF, active);
+    active->in_state  = QDR_ROUTE_STATE_DOWN;
+    active->out_state = QDR_ROUTE_STATE_DOWN;
+    active->in_link   = 0;
+    active->out_link  = 0;
+
+    //
+    // Create the linkages between the route-config, active, and connection-identifier.
+    //
+    active->config  = route;
+    active->conn_id = cid;
+
+    DEQ_INSERT_TAIL(route->active_list, active);
+    DEQ_INSERT_TAIL_N(REF, cid->active_refs, active);
+
+    //
+    // If the connection identifier represents an already open connection, activate the route.
+    //
+    if (cid->open_connection)
+        qdr_route_activate_CT(core, active);
 }
 
 
-void qdr_route_connection_delete_CT(qdr_route_config_t *route,
+void qdr_route_connection_delete_CT(qdr_core_t         *core,
+                                    qdr_route_config_t *route,
                                     qd_parsed_field_t  *conn_id,
                                     bool                is_container)
 {
 }
 
 
-void qdr_route_connection_kill_CT(qdr_route_config_t *route,
+void qdr_route_connection_kill_CT(qdr_core_t         *core,
+                                  qdr_route_config_t *route,
                                   qd_parsed_field_t  *conn_id,
                                   bool                is_container)
 {
 }
 
 
-void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn)
+void qdr_route_connection_opened_CT(qdr_core_t       *core,
+                                    qdr_connection_t *conn,
+                                    qdr_field_t      *field,
+                                    bool              is_container)
 {
+    if (conn->role != QDR_ROLE_ROUTE_CONTAINER || !field)
+        return;
+
+    qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, field->iterator, is_container);
+
+    assert(!cid->open_connection);
+    cid->open_connection = conn;
+    conn->conn_id        = cid;
+
+    //
+    // Activate all routes associated with this remote container.
+    //
+    qdr_route_active_t *active = DEQ_HEAD(cid->active_refs);
+    while (active) {
+        qdr_route_activate_CT(core, active);
+        active = DEQ_NEXT_N(REF, active);
+    }
 }
 
 
 void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
 {
+    if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
+        return;
+
+    qdr_conn_identifier_t *cid = conn->conn_id;
+    if (cid) {
+        //
+        // De-activate all routes associated with this remote container.
+        //
+        qdr_route_active_t *active = DEQ_HEAD(cid->active_refs);
+        while (active) {
+            qdr_route_deactivate_CT(core, active);
+            active = DEQ_NEXT_N(REF, active);
+        }
+
+        cid->open_connection = 0;
+        conn->conn_id        = 0;
+
+        qdr_route_check_id_for_deletion_CT(core, cid);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
index 7e86721..53e1758 100644
--- a/src/router_core/route_control.h
+++ b/src/router_core/route_control.h
@@ -29,21 +29,27 @@ const char *qdr_route_create_CT(qdr_core_t             *core,
                                 qd_parsed_field_t      *route_addr_field,
                                 qdr_route_config_t    **route);
 
-void qdr_route_delete_CT(qdr_route_config_t *route);
+void qdr_route_delete_CT(qdr_core_t *core, qdr_route_config_t *route);
 
-void qdr_route_connection_add_CT(qdr_route_config_t *route,
+void qdr_route_connection_add_CT(qdr_core_t         *core,
+                                 qdr_route_config_t *route,
                                  qd_parsed_field_t  *conn_id,
                                  bool                is_container);
 
-void qdr_route_connection_delete_CT(qdr_route_config_t *route,
+void qdr_route_connection_delete_CT(qdr_core_t         *core,
+                                    qdr_route_config_t *route,
                                     qd_parsed_field_t  *conn_id,
                                     bool                is_container);
 
-void qdr_route_connection_kill_CT(qdr_route_config_t *route,
+void qdr_route_connection_kill_CT(qdr_core_t         *core,
+                                  qdr_route_config_t *route,
                                   qd_parsed_field_t  *conn_id,
                                   bool                is_container);
 
-void qdr_route_connection_opened_CT(qdr_core_t *core, qdr_connection_t *conn);
+void qdr_route_connection_opened_CT(qdr_core_t       *core,
+                                    qdr_connection_t *conn,
+                                    qdr_field_t      *field,
+                                    bool              is_container);
 
 void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index f6de278..84fb8d8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -83,7 +83,7 @@ struct qdr_action_t {
         //
         struct {
             qdr_connection_t *conn;
-            const char       *label;
+            qdr_field_t      *connection_label;
             qdr_field_t      *container_id;
             qdr_link_t       *link;
             qdr_delivery_t   *delivery;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f5f10ba8/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index 414c84a..d320b60 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -244,9 +244,10 @@ static char* test_view_address_hash(void *context)
 static char* test_view_address_hash_override(void *context)
 {
     struct {const char *addr; const char *view;} cases[] = {
-    {"amqp:/link-target",        "Clink-target"},
-    {"amqp:/domain/link-target", "Cdomain/link-target"},
-    {"domain/link-target",       "Cdomain/link-target"},
+    {"amqp:/link-target",                    "Clink-target"},
+    {"amqp:/domain/link-target",             "Cdomain/link-target"},
+    {"domain/link-target",                   "Cdomain/link-target"},
+    {"bbc79fb3-e1fd-4a08-92b2-9a2de232b558", "Cbbc79fb3-e1fd-4a08-92b2-9a2de232b558"},
     {0, 0}
     };
     int idx;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org