You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/28 21:09:55 UTC

svn commit: r1655450 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/ tests/ tests/config-2-broker/

Author: tross
Date: Wed Jan 28 20:09:54 2015
New Revision: 1655450

URL: http://svn.apache.org/r1655450
Log:
DISPATCH-6 - Add connection open/close handlers to on-demand connectores in connection_manager.

Added:
    qpid/dispatch/trunk/src/connection_manager_private.h   (with props)
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
    qpid/dispatch/trunk/include/qpid/dispatch/server.h
    qpid/dispatch/trunk/src/connection_manager.c
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/ext_container.c
    qpid/dispatch/trunk/src/ext_container_private.h
    qpid/dispatch/trunk/src/router_config.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/waypoint.c
    qpid/dispatch/trunk/tests/config-2-broker/A.conf
    qpid/dispatch/trunk/tests/field_test.c

Modified: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h Wed Jan 28 20:09:54 2015
@@ -24,11 +24,14 @@
  */
 
 #include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/server.h>
 
 typedef struct qd_connection_manager_t qd_connection_manager_t;
 typedef struct qd_config_connector_t qd_config_connector_t;
 typedef struct qd_config_listener_t qd_config_listener_t;
 
+typedef void (*qd_connection_manager_handler_t) (void *context, qd_connection_t *conn);
+
 /**
  * Allocate a connection manager
  *
@@ -66,6 +69,20 @@ qd_config_connector_t *qd_connection_man
 
 
 /**
+ * Set open and close handlers for a connector.
+ *
+ * @param cxtr A configured connector.
+ * @param open_handler A handler callback
+ * @param close_handler A handler callback
+ * @param context Context to be passed back to the handler
+ */
+void qd_connection_manager_set_handlers(qd_config_connector_t *cc,
+                                        qd_connection_manager_handler_t open_handler,
+                                        qd_connection_manager_handler_t close_handler,
+                                        void *context);
+
+
+/**
  * Start an on-demand connector.
  *
  * @param qd Pointer to the dispatch instance.

Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Wed Jan 28 20:09:54 2015
@@ -387,6 +387,15 @@ void *qd_connection_get_context(qd_conne
 
 
 /**
+ * Get the configuration context (connector or listener) for this connection.
+ *
+ * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The context supplied at the creation of the listener or connector.
+ */
+void *qd_connection_get_config_context(qd_connection_t *conn);
+
+
+/**
  * Set the link context for a connection.
  *
  * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN

Modified: qpid/dispatch/trunk/src/connection_manager.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Wed Jan 28 20:09:54 2015
@@ -20,13 +20,16 @@
 #include <qpid/dispatch/connection_manager.h>
 #include <qpid/dispatch/ctools.h>
 #include "dispatch_private.h"
+#include "connection_manager_private.h"
 #include "server_private.h"
 #include "entity.h"
 #include "entity_cache.h"
 #include "schema_enum.h"
 #include <string.h>
+#include <stdio.h>
 
 struct qd_config_listener_t {
+    bool is_connector;
     DEQ_LINKS(qd_config_listener_t);
     qd_listener_t      *listener;
     qd_server_config_t  configuration;
@@ -36,12 +39,16 @@ DEQ_DECLARE(qd_config_listener_t, qd_con
 
 
 struct qd_config_connector_t {
+    bool is_connector;
     DEQ_LINKS(qd_config_connector_t);
-    void               *context;
-    const char         *connector_name;
-    qd_connector_t     *connector;
-    qd_server_config_t  configuration;
-    bool                started;
+    void                            *context;
+    const char                      *connector_name;
+    qd_connector_t                  *connector;
+    qd_server_config_t               configuration;
+    bool                             started;
+    qd_connection_manager_handler_t  open_handler;
+    qd_connection_manager_handler_t  close_handler;
+    void                            *handler_context;
 };
 
 DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
@@ -126,6 +133,7 @@ void qd_dispatch_configure_listener(qd_d
 {
     qd_connection_manager_t *cm = qd->connection_manager;
     qd_config_listener_t *cl = NEW(qd_config_listener_t);
+    cl->is_connector = false;
     cl->listener = 0;
     load_server_config(qd, &cl->configuration, entity);
     DEQ_ITEM_INIT(cl);
@@ -141,6 +149,7 @@ qd_error_t qd_dispatch_configure_connect
     qd_connection_manager_t *cm = qd->connection_manager;
     qd_config_connector_t *cc = NEW(qd_config_connector_t);
     memset(cc, 0, sizeof(*cc));
+    cc->is_connector = true;
     if (load_server_config(qd, &cc->configuration, entity))
         return qd_error_code();
     DEQ_ITEM_INIT(cc);
@@ -240,10 +249,26 @@ qd_config_connector_t *qd_connection_man
 }
 
 
+void qd_connection_manager_set_handlers(qd_config_connector_t *cc,
+                                        qd_connection_manager_handler_t open_handler,
+                                        qd_connection_manager_handler_t close_handler,
+                                        void *context)
+{
+    if (cc) {
+        cc->open_handler    = open_handler;
+        cc->close_handler   = close_handler;
+        cc->handler_context = context;
+    }
+}
+
+
 void qd_connection_manager_start_on_demand(qd_dispatch_t *qd, qd_config_connector_t *cc)
 {
-    if (cc && cc->connector == 0)
+    if (cc && cc->connector == 0) {
+        qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Starting on-demand connector: %s",
+               cc->connector_name);
         cc->connector = qd_server_connect(qd, &cc->configuration, cc);
+    }
 }
 
 
@@ -268,3 +293,20 @@ const char *qd_config_connector_name(qd_
 {
     return cc ? cc->connector_name : 0;
 }
+
+
+void qd_connection_manager_connection_opened(qd_connection_t *conn)
+{
+    qd_config_connector_t *cc = (qd_config_connector_t*) qd_connection_get_config_context(conn);
+    if (cc && cc->is_connector && cc->open_handler)
+        cc->open_handler(cc->handler_context, conn);
+}
+
+
+void qd_connection_manager_connection_closed(qd_connection_t *conn)
+{
+    qd_config_connector_t *cc = (qd_config_connector_t*) qd_connection_get_config_context(conn);
+    if (cc && cc->is_connector && cc->close_handler)
+        cc->close_handler(cc->handler_context, conn);
+}
+

Added: qpid/dispatch/trunk/src/connection_manager_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager_private.h?rev=1655450&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager_private.h (added)
+++ qpid/dispatch/trunk/src/connection_manager_private.h Wed Jan 28 20:09:54 2015
@@ -0,0 +1,33 @@
+#ifndef __ext_connection_manager_private_h__
+#define __ext_connection_manager_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/server.h>
+
+/**
+ * @file
+ *
+ * Private/inter-module API for connection manager.
+ */
+
+void qd_connection_manager_connection_opened(qd_connection_t *conn);
+void qd_connection_manager_connection_closed(qd_connection_t *conn);
+
+#endif

Propchange: qpid/dispatch/trunk/src/connection_manager_private.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/dispatch/trunk/src/connection_manager_private.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Wed Jan 28 20:09:54 2015
@@ -20,6 +20,7 @@
 #include <stdio.h>
 #include <string.h>
 #include "dispatch_private.h"
+#include "connection_manager_private.h"
 #include <qpid/dispatch/container.h>
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/message.h>
@@ -247,8 +248,10 @@ static void do_updated(pn_delivery_t *pn
 }
 
 
-static int close_handler(void* unused, pn_connection_t *conn)
+static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn)
 {
+    qd_connection_manager_connection_closed(qd_conn);
+
     //
     // Close all links, passing False as the 'closed' argument.  These links are not
     // being properly 'detached'.  They are being orphaned.
@@ -305,11 +308,13 @@ static int process_handler(qd_container_
         case PN_CONNECTION_REMOTE_OPEN :
             if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
                 pn_connection_open(conn);
+            qd_connection_manager_connection_opened(qd_conn);
             break;
 
         case PN_CONNECTION_REMOTE_CLOSE :
             if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
                 pn_connection_close(conn);
+            qd_connection_manager_connection_closed(qd_conn);
             break;
 
         case PN_SESSION_REMOTE_OPEN :
@@ -434,7 +439,7 @@ static int handler(void *handler_context
     switch (event) {
     case QD_CONN_EVENT_LISTENER_OPEN:  open_handler(container, qd_conn, QD_INCOMING, conn_context);   break;
     case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context);   break;
-    case QD_CONN_EVENT_CLOSE:          return close_handler(conn_context, conn);
+    case QD_CONN_EVENT_CLOSE:          return close_handler(conn_context, conn, qd_conn);
     case QD_CONN_EVENT_PROCESS:        return process_handler(container, conn_context, qd_conn);
     }
 

Modified: qpid/dispatch/trunk/src/ext_container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/ext_container.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/ext_container.c (original)
+++ qpid/dispatch/trunk/src/ext_container.c Wed Jan 28 20:09:54 2015
@@ -18,32 +18,64 @@
  */
 
 #include "dispatch_private.h"
-#include "router_private.h"
 #include "entity_cache.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/connection_manager.h>
+#include <qpid/dispatch/timer.h>
 #include <memory.h>
 #include <stdio.h>
 
 struct qd_external_container_t {
     DEQ_LINKS(qd_external_container_t);
-    char *prefix;
-    char *connector_name;
+    qd_dispatch_t *qd;
+    char          *prefix;
+    char          *connector_name;
+    qd_timer_t    *timer;
 };
 
 DEQ_DECLARE(qd_external_container_t, qd_external_container_list_t);
 
 static qd_external_container_list_t ec_list = DEQ_EMPTY;
 
-qd_external_container_t *qd_external_container(qd_router_t *router, const char *prefix, const char *connector_name)
+
+static void qd_external_container_open_handler(void *context, qd_connection_t *conn)
+{
+    //const char *name = (char*) context;
+}
+
+
+static void qd_external_container_close_handler(void *context, qd_connection_t *conn)
+{
+    //const char *name = (char*) context;
+}
+
+
+static void qd_external_container_timer_handler(void *context)
+{
+    qd_external_container_t *ec = (qd_external_container_t*) context;
+    qd_config_connector_t   *cc = qd_connection_manager_find_on_demand(ec->qd, ec->connector_name);
+    if (cc) {
+        qd_connection_manager_set_handlers(cc,
+                                           qd_external_container_open_handler,
+                                           qd_external_container_close_handler,
+                                           (void*) ec->connector_name);
+        qd_connection_manager_start_on_demand(ec->qd, cc);
+    }
+}
+
+
+qd_external_container_t *qd_external_container(qd_dispatch_t *qd, const char *prefix, const char *connector_name)
 {
     qd_external_container_t *ec = NEW(qd_external_container_t);
 
     if (ec) {
         DEQ_ITEM_INIT(ec);
+        ec->qd             = qd;
         ec->prefix         = strdup(prefix);
         ec->connector_name = strdup(connector_name);
+        ec->timer          = qd_timer(qd, qd_external_container_timer_handler, ec);
         DEQ_INSERT_TAIL(ec_list, ec);
+        qd_timer_schedule(ec->timer, 0);
     }
 
     return ec;
@@ -55,6 +87,7 @@ void qd_external_container_free(qd_exter
     if (ec) {
         free(ec->prefix);
         free(ec->connector_name);
+        qd_timer_free(ec->timer);
         DEQ_REMOVE(ec_list, ec);
         free(ec);
     }

Modified: qpid/dispatch/trunk/src/ext_container_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/ext_container_private.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/ext_container_private.h (original)
+++ qpid/dispatch/trunk/src/ext_container_private.h Wed Jan 28 20:09:54 2015
@@ -31,7 +31,7 @@
  * routed links destined for this external container.
  */
 
-qd_external_container_t *qd_external_container(qd_router_t *router, const char *prefix, const char *connector_name);
+qd_external_container_t *qd_external_container(qd_dispatch_t *qd, const char *prefix, const char *connector_name);
 
 void qd_external_container_free(qd_external_container_t *ec);
 

Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Wed Jan 28 20:09:54 2015
@@ -142,7 +142,7 @@ qd_error_t qd_router_configure_external_
     char *prefix    = qd_entity_get_string(entity, "prefix"); QD_ERROR_RET();
     char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
 
-    qd_external_container_t *ec = qd_external_container(router, prefix, connector);
+    qd_external_container_t *ec = qd_external_container(router->qd, prefix, connector);
 
     if (!ec) {
         free(prefix);

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Wed Jan 28 20:09:54 2015
@@ -959,6 +959,12 @@ void *qd_connection_get_context(qd_conne
 }
 
 
+void *qd_connection_get_config_context(qd_connection_t *conn)
+{
+    return conn->context;
+}
+
+
 void qd_connection_set_link_context(qd_connection_t *conn, void *context)
 {
     conn->link_context = context;

Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Wed Jan 28 20:09:54 2015
@@ -282,6 +282,8 @@ void qd_waypoint_activate_all(qd_dispatc
 void qd_waypoint_connection_opened(qd_dispatch_t *qd, qd_config_connector_t *cc, qd_connection_t *conn)
 {
     qd_waypoint_context_t *context = (qd_waypoint_context_t*) qd_config_connector_context(cc);
+    if (!context)
+        return;
 
     qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector '%s' opened",
            qd_config_connector_name(cc));

Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Wed Jan 28 20:09:54 2015
@@ -73,7 +73,7 @@ connector {
 }
 
 externalContainer {
-    prefix: /queue/
+    prefix: queue/
     connector: broker
 }
 

Modified: qpid/dispatch/trunk/tests/field_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/field_test.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/field_test.c (original)
+++ qpid/dispatch/trunk/tests/field_test.c Wed Jan 28 20:09:54 2015
@@ -144,6 +144,7 @@ static char* test_view_address_hash_over
     struct {const char *addr; const char *view;} cases[] = {
     {"amqp:/link-target",        "Clink-target"},
     {"amqp:/domain/link-target", "Cdomain/link-target"},
+    {"domain/link-target",       "Cdomain/link-target"},
     {0, 0}
     };
     int idx;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org