You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/28 21:09:55 UTC
svn commit: r1655450 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/
tests/ tests/config-2-broker/
Author: tross
Date: Wed Jan 28 20:09:54 2015
New Revision: 1655450
URL: http://svn.apache.org/r1655450
Log:
DISPATCH-6 - Add connection open/close handlers to on-demand connectores in connection_manager.
Added:
qpid/dispatch/trunk/src/connection_manager_private.h (with props)
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
qpid/dispatch/trunk/include/qpid/dispatch/server.h
qpid/dispatch/trunk/src/connection_manager.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/ext_container.c
qpid/dispatch/trunk/src/ext_container_private.h
qpid/dispatch/trunk/src/router_config.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/waypoint.c
qpid/dispatch/trunk/tests/config-2-broker/A.conf
qpid/dispatch/trunk/tests/field_test.c
Modified: qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/connection_manager.h Wed Jan 28 20:09:54 2015
@@ -24,11 +24,14 @@
*/
#include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/server.h>
typedef struct qd_connection_manager_t qd_connection_manager_t;
typedef struct qd_config_connector_t qd_config_connector_t;
typedef struct qd_config_listener_t qd_config_listener_t;
+typedef void (*qd_connection_manager_handler_t) (void *context, qd_connection_t *conn);
+
/**
* Allocate a connection manager
*
@@ -66,6 +69,20 @@ qd_config_connector_t *qd_connection_man
/**
+ * Set open and close handlers for a connector.
+ *
+ * @param cxtr A configured connector.
+ * @param open_handler A handler callback
+ * @param close_handler A handler callback
+ * @param context Context to be passed back to the handler
+ */
+void qd_connection_manager_set_handlers(qd_config_connector_t *cc,
+ qd_connection_manager_handler_t open_handler,
+ qd_connection_manager_handler_t close_handler,
+ void *context);
+
+
+/**
* Start an on-demand connector.
*
* @param qd Pointer to the dispatch instance.
Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Wed Jan 28 20:09:54 2015
@@ -387,6 +387,15 @@ void *qd_connection_get_context(qd_conne
/**
+ * Get the configuration context (connector or listener) for this connection.
+ *
+ * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The context supplied at the creation of the listener or connector.
+ */
+void *qd_connection_get_config_context(qd_connection_t *conn);
+
+
+/**
* Set the link context for a connection.
*
* @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
Modified: qpid/dispatch/trunk/src/connection_manager.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager.c (original)
+++ qpid/dispatch/trunk/src/connection_manager.c Wed Jan 28 20:09:54 2015
@@ -20,13 +20,16 @@
#include <qpid/dispatch/connection_manager.h>
#include <qpid/dispatch/ctools.h>
#include "dispatch_private.h"
+#include "connection_manager_private.h"
#include "server_private.h"
#include "entity.h"
#include "entity_cache.h"
#include "schema_enum.h"
#include <string.h>
+#include <stdio.h>
struct qd_config_listener_t {
+ bool is_connector;
DEQ_LINKS(qd_config_listener_t);
qd_listener_t *listener;
qd_server_config_t configuration;
@@ -36,12 +39,16 @@ DEQ_DECLARE(qd_config_listener_t, qd_con
struct qd_config_connector_t {
+ bool is_connector;
DEQ_LINKS(qd_config_connector_t);
- void *context;
- const char *connector_name;
- qd_connector_t *connector;
- qd_server_config_t configuration;
- bool started;
+ void *context;
+ const char *connector_name;
+ qd_connector_t *connector;
+ qd_server_config_t configuration;
+ bool started;
+ qd_connection_manager_handler_t open_handler;
+ qd_connection_manager_handler_t close_handler;
+ void *handler_context;
};
DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
@@ -126,6 +133,7 @@ void qd_dispatch_configure_listener(qd_d
{
qd_connection_manager_t *cm = qd->connection_manager;
qd_config_listener_t *cl = NEW(qd_config_listener_t);
+ cl->is_connector = false;
cl->listener = 0;
load_server_config(qd, &cl->configuration, entity);
DEQ_ITEM_INIT(cl);
@@ -141,6 +149,7 @@ qd_error_t qd_dispatch_configure_connect
qd_connection_manager_t *cm = qd->connection_manager;
qd_config_connector_t *cc = NEW(qd_config_connector_t);
memset(cc, 0, sizeof(*cc));
+ cc->is_connector = true;
if (load_server_config(qd, &cc->configuration, entity))
return qd_error_code();
DEQ_ITEM_INIT(cc);
@@ -240,10 +249,26 @@ qd_config_connector_t *qd_connection_man
}
+void qd_connection_manager_set_handlers(qd_config_connector_t *cc,
+ qd_connection_manager_handler_t open_handler,
+ qd_connection_manager_handler_t close_handler,
+ void *context)
+{
+ if (cc) {
+ cc->open_handler = open_handler;
+ cc->close_handler = close_handler;
+ cc->handler_context = context;
+ }
+}
+
+
void qd_connection_manager_start_on_demand(qd_dispatch_t *qd, qd_config_connector_t *cc)
{
- if (cc && cc->connector == 0)
+ if (cc && cc->connector == 0) {
+ qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Starting on-demand connector: %s",
+ cc->connector_name);
cc->connector = qd_server_connect(qd, &cc->configuration, cc);
+ }
}
@@ -268,3 +293,20 @@ const char *qd_config_connector_name(qd_
{
return cc ? cc->connector_name : 0;
}
+
+
+void qd_connection_manager_connection_opened(qd_connection_t *conn)
+{
+ qd_config_connector_t *cc = (qd_config_connector_t*) qd_connection_get_config_context(conn);
+ if (cc && cc->is_connector && cc->open_handler)
+ cc->open_handler(cc->handler_context, conn);
+}
+
+
+void qd_connection_manager_connection_closed(qd_connection_t *conn)
+{
+ qd_config_connector_t *cc = (qd_config_connector_t*) qd_connection_get_config_context(conn);
+ if (cc && cc->is_connector && cc->close_handler)
+ cc->close_handler(cc->handler_context, conn);
+}
+
Added: qpid/dispatch/trunk/src/connection_manager_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/connection_manager_private.h?rev=1655450&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/connection_manager_private.h (added)
+++ qpid/dispatch/trunk/src/connection_manager_private.h Wed Jan 28 20:09:54 2015
@@ -0,0 +1,33 @@
+#ifndef __ext_connection_manager_private_h__
+#define __ext_connection_manager_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/server.h>
+
+/**
+ * @file
+ *
+ * Private/inter-module API for connection manager.
+ */
+
+void qd_connection_manager_connection_opened(qd_connection_t *conn);
+void qd_connection_manager_connection_closed(qd_connection_t *conn);
+
+#endif
Propchange: qpid/dispatch/trunk/src/connection_manager_private.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/dispatch/trunk/src/connection_manager_private.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Modified: qpid/dispatch/trunk/src/container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Wed Jan 28 20:09:54 2015
@@ -20,6 +20,7 @@
#include <stdio.h>
#include <string.h>
#include "dispatch_private.h"
+#include "connection_manager_private.h"
#include <qpid/dispatch/container.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/message.h>
@@ -247,8 +248,10 @@ static void do_updated(pn_delivery_t *pn
}
-static int close_handler(void* unused, pn_connection_t *conn)
+static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn)
{
+ qd_connection_manager_connection_closed(qd_conn);
+
//
// Close all links, passing False as the 'closed' argument. These links are not
// being properly 'detached'. They are being orphaned.
@@ -305,11 +308,13 @@ static int process_handler(qd_container_
case PN_CONNECTION_REMOTE_OPEN :
if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
pn_connection_open(conn);
+ qd_connection_manager_connection_opened(qd_conn);
break;
case PN_CONNECTION_REMOTE_CLOSE :
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED))
pn_connection_close(conn);
+ qd_connection_manager_connection_closed(qd_conn);
break;
case PN_SESSION_REMOTE_OPEN :
@@ -434,7 +439,7 @@ static int handler(void *handler_context
switch (event) {
case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING, conn_context); break;
case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context); break;
- case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn);
+ case QD_CONN_EVENT_CLOSE: return close_handler(conn_context, conn, qd_conn);
case QD_CONN_EVENT_PROCESS: return process_handler(container, conn_context, qd_conn);
}
Modified: qpid/dispatch/trunk/src/ext_container.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/ext_container.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/ext_container.c (original)
+++ qpid/dispatch/trunk/src/ext_container.c Wed Jan 28 20:09:54 2015
@@ -18,32 +18,64 @@
*/
#include "dispatch_private.h"
-#include "router_private.h"
#include "entity_cache.h"
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/connection_manager.h>
+#include <qpid/dispatch/timer.h>
#include <memory.h>
#include <stdio.h>
struct qd_external_container_t {
DEQ_LINKS(qd_external_container_t);
- char *prefix;
- char *connector_name;
+ qd_dispatch_t *qd;
+ char *prefix;
+ char *connector_name;
+ qd_timer_t *timer;
};
DEQ_DECLARE(qd_external_container_t, qd_external_container_list_t);
static qd_external_container_list_t ec_list = DEQ_EMPTY;
-qd_external_container_t *qd_external_container(qd_router_t *router, const char *prefix, const char *connector_name)
+
+static void qd_external_container_open_handler(void *context, qd_connection_t *conn)
+{
+ //const char *name = (char*) context;
+}
+
+
+static void qd_external_container_close_handler(void *context, qd_connection_t *conn)
+{
+ //const char *name = (char*) context;
+}
+
+
+static void qd_external_container_timer_handler(void *context)
+{
+ qd_external_container_t *ec = (qd_external_container_t*) context;
+ qd_config_connector_t *cc = qd_connection_manager_find_on_demand(ec->qd, ec->connector_name);
+ if (cc) {
+ qd_connection_manager_set_handlers(cc,
+ qd_external_container_open_handler,
+ qd_external_container_close_handler,
+ (void*) ec->connector_name);
+ qd_connection_manager_start_on_demand(ec->qd, cc);
+ }
+}
+
+
+qd_external_container_t *qd_external_container(qd_dispatch_t *qd, const char *prefix, const char *connector_name)
{
qd_external_container_t *ec = NEW(qd_external_container_t);
if (ec) {
DEQ_ITEM_INIT(ec);
+ ec->qd = qd;
ec->prefix = strdup(prefix);
ec->connector_name = strdup(connector_name);
+ ec->timer = qd_timer(qd, qd_external_container_timer_handler, ec);
DEQ_INSERT_TAIL(ec_list, ec);
+ qd_timer_schedule(ec->timer, 0);
}
return ec;
@@ -55,6 +87,7 @@ void qd_external_container_free(qd_exter
if (ec) {
free(ec->prefix);
free(ec->connector_name);
+ qd_timer_free(ec->timer);
DEQ_REMOVE(ec_list, ec);
free(ec);
}
Modified: qpid/dispatch/trunk/src/ext_container_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/ext_container_private.h?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/ext_container_private.h (original)
+++ qpid/dispatch/trunk/src/ext_container_private.h Wed Jan 28 20:09:54 2015
@@ -31,7 +31,7 @@
* routed links destined for this external container.
*/
-qd_external_container_t *qd_external_container(qd_router_t *router, const char *prefix, const char *connector_name);
+qd_external_container_t *qd_external_container(qd_dispatch_t *qd, const char *prefix, const char *connector_name);
void qd_external_container_free(qd_external_container_t *ec);
Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Wed Jan 28 20:09:54 2015
@@ -142,7 +142,7 @@ qd_error_t qd_router_configure_external_
char *prefix = qd_entity_get_string(entity, "prefix"); QD_ERROR_RET();
char *connector = qd_entity_get_string(entity, "connector"); QD_ERROR_RET();
- qd_external_container_t *ec = qd_external_container(router, prefix, connector);
+ qd_external_container_t *ec = qd_external_container(router->qd, prefix, connector);
if (!ec) {
free(prefix);
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Wed Jan 28 20:09:54 2015
@@ -959,6 +959,12 @@ void *qd_connection_get_context(qd_conne
}
+void *qd_connection_get_config_context(qd_connection_t *conn)
+{
+ return conn->context;
+}
+
+
void qd_connection_set_link_context(qd_connection_t *conn, void *context)
{
conn->link_context = context;
Modified: qpid/dispatch/trunk/src/waypoint.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/waypoint.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/waypoint.c (original)
+++ qpid/dispatch/trunk/src/waypoint.c Wed Jan 28 20:09:54 2015
@@ -282,6 +282,8 @@ void qd_waypoint_activate_all(qd_dispatc
void qd_waypoint_connection_opened(qd_dispatch_t *qd, qd_config_connector_t *cc, qd_connection_t *conn)
{
qd_waypoint_context_t *context = (qd_waypoint_context_t*) qd_config_connector_context(cc);
+ if (!context)
+ return;
qd_log(qd->router->log_source, QD_LOG_INFO, "On-demand connector '%s' opened",
qd_config_connector_name(cc));
Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Wed Jan 28 20:09:54 2015
@@ -73,7 +73,7 @@ connector {
}
externalContainer {
- prefix: /queue/
+ prefix: queue/
connector: broker
}
Modified: qpid/dispatch/trunk/tests/field_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/field_test.c?rev=1655450&r1=1655449&r2=1655450&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/field_test.c (original)
+++ qpid/dispatch/trunk/tests/field_test.c Wed Jan 28 20:09:54 2015
@@ -144,6 +144,7 @@ static char* test_view_address_hash_over
struct {const char *addr; const char *view;} cases[] = {
{"amqp:/link-target", "Clink-target"},
{"amqp:/domain/link-target", "Cdomain/link-target"},
+ {"domain/link-target", "Cdomain/link-target"},
{0, 0}
};
int idx;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org