You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/27 17:38:30 UTC
[05/10] qpid-dispatch git commit: DISPATCH-390: Convert dispatch to
use pn_proactor_t
DISPATCH-390: Convert dispatch to use pn_proactor_t
- remove driver.h/c, server.c uses proactor API directly
- update stop/start, signal handling
- refactor server connector queue processing as PN event handlers
- qd_timer using pn_proactor_timeout()
- deferred calls use pn_proactor_wake()
- drop qd_thread_t struct, use sys_thread_t directly
- document new listen "host" semantics in schema
- updated logging, NOTICE for key life-cycle events
- merge qd_listener_t+qd_config_listener, qd_connector_t+qd_config_connector_t
- remove dead code: work_ queue, a_thread_is_waiting, owner_thread
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6f56e289
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6f56e289
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6f56e289
Branch: refs/heads/master
Commit: 6f56e289bec0db4a1de257883dc456a502c42fe7
Parents: 0539dc4
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jan 20 14:20:31 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Apr 27 13:29:40 2017 -0400
----------------------------------------------------------------------
config.sh | 41 +-
include/qpid/dispatch/amqp.h | 2 -
include/qpid/dispatch/connection_manager.h | 21 -
include/qpid/dispatch/driver.h | 441 ------
include/qpid/dispatch/error.h | 7 +
include/qpid/dispatch/router_core.h | 11 +-
include/qpid/dispatch/server.h | 240 +--
include/qpid/dispatch/timer.h | 10 +-
python/qpid_dispatch/management/qdrouter.json | 4 +-
python/qpid_dispatch_internal/router/address.py | 5 -
router/CMakeLists.txt | 2 +-
router/src/main.c | 29 +-
src/CMakeLists.txt | 3 +-
src/amqp.c | 2 -
src/connection_manager.c | 192 ++-
src/container.c | 110 +-
src/error.c | 15 +-
src/http-none.c | 1 -
src/policy.c | 97 +-
src/policy.h | 9 +-
src/posix/driver.c | 1093 --------------
src/router_core/connections.c | 14 +-
src/router_core/router_core_private.h | 3 -
src/router_core/router_core_thread.c | 18 -
src/router_node.c | 16 +-
src/server.c | 1398 ++++++------------
src/server_private.h | 74 +-
src/timer.c | 183 +--
src/timer_private.h | 14 +-
tests/CMakeLists.txt | 2 +-
tests/run_unit_tests.c | 4 +-
tests/system_tests_management.py | 2 +-
tests/system_tests_policy.py | 59 +-
tests/timer_test.c | 305 ++--
34 files changed, 961 insertions(+), 3466 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/config.sh
----------------------------------------------------------------------
diff --git a/config.sh b/config.sh
index 7e5e97f..beb7ed2 100644
--- a/config.sh
+++ b/config.sh
@@ -1,33 +1,8 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [[ ! -f config.sh ]]; then
- echo "You must source config.sh from within its own directory"
- return
-fi
-
-export SOURCE_DIR=$(pwd)
-export BUILD_DIR=$SOURCE_DIR/${1:-build}
-export INSTALL_DIR=$SOURCE_DIR/${2:-install}
-
-PYTHON_LIB=$(python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(prefix='$INSTALL_DIR')")
-
-export LD_LIBRARY_PATH=$INSTALL_DIR/lib64:$INSTALL_DIR/lib:$LD_LIBRARY_PATH
-export PYTHONPATH=$PYTHON_LIB:$PYTHONPATH
-export PATH=$INSTALL_DIR/sbin:$INSTALL_DIR/bin:$SOURCE_DIR/bin:$PATH
+PYTHONPATH=/home/aconway/dispatch/python:/home/aconway/dispatch/tests:/home/aconway/dispatch:/usr/local/lib/proton/bindings/python:/usr/local/lib64/proton/bindings/python:/usr/local/lib/python2.7/site-packages:/usr/local/lib64/python2.7/site-packages:/usr/lib/python27.zip:/usr/lib64/python2.7:/usr/lib64/python2.7/plat-linux2:/usr/lib64/python2.7/lib-tk:/usr/lib64/python2.7/lib-old:/usr/lib64/python2.7/lib-dynload:/usr/lib64/python2.7/site-packages:/usr/lib/python2.7/site-packages
+BUILD_DIR=/home/aconway/dispatch
+QPID_DISPATCH_HOME=/home/aconway/dispatch
+QPID_DISPATCH_LIB=/home/aconway/dispatch/src/
+MANPATH=/home/aconway/dispatch/doc/man:/usr/local/share/man:/usr/share/man
+PATH=/home/aconway/dispatch:/home/aconway/dispatch/tests:/home/aconway/dispatch/router:/home/aconway/dispatch/tools:/home/aconway/dispatch/bin:/home/aconway/bin:/home/aconway/ha/bin:/usr/local/bin:/usr/local/sbin:/usr/lib64/qt-3.3/bin:/usr/lib64/ccache:/usr/bin:/usr/sbin:/home/aconway/go/bin:/home/aconway/proton/proton-c/bindings/go/bin
+SOURCE_DIR=/home/aconway/dispatch
+export PYTHONPATH BUILD_DIR QPID_DISPATCH_HOME QPID_DISPATCH_LIB MANPATH PATH SOURCE_DIR
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 60ad973..e5c45c6 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -165,6 +165,4 @@ extern const char * const QD_AMQP_COND_ILLEGAL_STATE;
extern const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL;
/// @};
-/// Name for AMQP conditions from the router that don't have a more specific name.
-extern const char * const QD_COND;
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/connection_manager.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/connection_manager.h b/include/qpid/dispatch/connection_manager.h
index 12ac35e..4392966 100644
--- a/include/qpid/dispatch/connection_manager.h
+++ b/include/qpid/dispatch/connection_manager.h
@@ -27,8 +27,6 @@
#include <qpid/dispatch/server.h>
typedef struct qd_connection_manager_t qd_connection_manager_t;
-typedef struct qd_config_connector_t qd_config_connector_t;
-typedef struct qd_config_listener_t qd_config_listener_t;
typedef struct qd_config_ssl_profile_t qd_config_ssl_profile_t;
typedef void (*qd_connection_manager_handler_t) (void *context, qd_connection_t *conn);
@@ -49,16 +47,6 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd);
void qd_connection_manager_free(qd_connection_manager_t *cm);
/**
- * Free all the resources associated with a config listener
- */
-void qd_config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *cl);
-
-/**
- * Free all the resources associated with a config connector
- */
-void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t *cl);
-
-/**
* Start the configured Listeners and Connectors
*
* Note that on-demand connectors are not started by this function.
@@ -67,13 +55,4 @@ void qd_config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t
*/
void qd_connection_manager_start(qd_dispatch_t *qd);
-
-/**
- * Get the connector's name.
- *
- * @param cc Connector handle
- * @return The name of the connector
- */
-const char *qd_config_connector_name(qd_config_connector_t *cc);
-
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/driver.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h
deleted file mode 100644
index 6c24a23..0000000
--- a/include/qpid/dispatch/driver.h
+++ /dev/null
@@ -1,441 +0,0 @@
-#ifndef __dispatch_posix_driver_h__
-#define __dispatch_posix_driver_h__ 1
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/dispatch/log.h>
-
-#include <proton/error.h>
-#include <proton/sasl.h>
-#include <proton/selectable.h>
-#include <proton/ssl.h>
-#include <proton/transport.h>
-#include <proton/types.h>
-
-/** @file
- * API for the Driver Layer.
- *
- * The driver library provides a simple implementation of a driver for
- * the proton engine. A driver is responsible for providing input,
- * output, and tick events to the bottom half of the engine API. See
- * pn_transport_input, pn_transport_output, and
- * pn_transport_tick. The driver also provides an interface for the
- * application to access the top half of the API when the state of the
- * engine may have changed due to I/O or timing events. Additionally
- * the driver incorporates the SASL engine as well in order to provide
- * a complete network stack: AMQP over SASL over TCP.
- *
- */
-
-typedef struct qdpn_driver_t qdpn_driver_t;
-typedef struct qdpn_listener_t qdpn_listener_t;
-typedef struct qdpn_connector_t qdpn_connector_t;
-
-typedef enum {
- QDPN_CONNECTOR_WRITABLE,
- QDPN_CONNECTOR_READABLE
-} qdpn_activate_criteria_t;
-
-/** Construct a driver
- *
- * Call qdpn_driver_free() to release the driver object.
- * @param log source to use for log messages, the driver does not have it's own.
- * @return new driver object, NULL if error
- */
-qdpn_driver_t *qdpn_driver(qd_log_source_t* log);
-
-/** Return the most recent error code.
- *
- * @param[in] d the driver
- *
- * @return the most recent error text for d
- */
-int qdpn_driver_errno(qdpn_driver_t *d);
-
-/** Get additional error information associated with the driver.
- *
- * Whenever a driver operation fails, additional error information can
- * be obtained using this function. The error object that is returned
- * may also be used to clear the error condition.
- *
- * The pointer returned by this operation is valid until the
- * driver object is freed.
- *
- * @param[in] d the driver
- *
- * @return the driver's error object
- */
-pn_error_t *qdpn_driver_error(qdpn_driver_t *d);
-
-/** Force qdpn_driver_wait() to return
- *
- * @param[in] driver the driver to wake up
- *
- * @return zero on success, an error code on failure
- */
-int qdpn_driver_wakeup(qdpn_driver_t *driver);
-
-/** Wait for an active connector or listener
- *
- * @param[in] driver the driver to wait on
- * @param[in] timeout maximum time in milliseconds to wait, -1 means
- * infinite wait
- *
- * @return zero on success, an error code on failure
- */
-int qdpn_driver_wait(qdpn_driver_t *driver, int timeout);
-
-/** Get the next listener with pending data in the driver.
- *
- * @param[in] driver the driver
- * @return NULL if no active listener available
- */
-qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *driver);
-
-/** Get the next active connector in the driver.
- *
- * Returns the next connector with pending inbound data, available
- * capacity for outbound data, or pending tick.
- *
- * @param[in] driver the driver
- * @return NULL if no active connector available
- */
-qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *driver);
-
-/** Free the driver allocated via qdpn_driver, and all associated
- * listeners and connectors.
- *
- * @param[in] driver the driver to free, no longer valid on
- * return
- */
-void qdpn_driver_free(qdpn_driver_t *driver);
-
-
-/** qdpn_listener - the server API **/
-
-/** Construct a listener for the given address.
- *
- * @param[in] driver driver that will 'own' this listener
- * @param[in] host local host address to listen on
- * @param[in] port local port to listen on
- * @param[in] protocol family to use (IPv4 or IPv6 or 0). If 0 (zero) is passed in the protocol family will be automatically determined from the address
- * @param[in] context application-supplied, can be accessed via
- * qdpn_listener_context()
- * @param[in] methods to apply to new connectors.
- * @return a new listener on the given host:port, NULL if error
- */
-qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver,
- const char *host,
- const char *port,
- const char *protocol_family,
- void* context
- );
-
-/** Access the head listener for a driver.
- *
- * @param[in] driver the driver whose head listener will be returned
- *
- * @return the head listener for driver or NULL if there is none
- */
-qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver);
-
-/** Access the next listener.
- *
- * @param[in] listener the listener whose next listener will be
- * returned
- *
- * @return the next listener
- */
-qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener);
-
-/** Accept a connection that is pending on the listener.
- *
- * @param[in] listener the listener to accept the connection on
- * @param[in] policy policy that holds absolute connection limits
- * @param[in] policy_fn function that accepts remote host name and returns
- * decision to allow or deny this connection
- * @param[out] counted pointer to a bool set to true when the connection was
- * counted against absolute connection limits
- * @return a new connector for the remote, or NULL on error
- */
-qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener,
- void *policy,
- bool (*policy_fn)(void *, const char *),
- bool *counted);
-
-/** Access the application context that is associated with the listener.
- *
- * @param[in] listener the listener whose context is to be returned
- * @return the application context that was passed to qdpn_listener() or
- * qdpn_listener_fd()
- */
-void *qdpn_listener_context(qdpn_listener_t *listener);
-
-void qdpn_listener_set_context(qdpn_listener_t *listener, void *context);
-
-/** Close the socket used by the listener.
- *
- * @param[in] listener the listener whose socket will be closed.
- */
-void qdpn_listener_close(qdpn_listener_t *listener);
-
-/** Frees the given listener.
- *
- * Assumes the listener's socket has been closed prior to call.
- *
- * @param[in] listener the listener object to free, no longer valid
- * on return
- */
-void qdpn_listener_free(qdpn_listener_t *listener);
-
-
-
-
-/** qdpn_connector - the client API **/
-
-/** Construct a connector to the given remote address.
- *
- * @param[in] driver owner of this connection.
- * @param[in] host remote host to connect to.
- * @param[in] port remote port to connect to.
- * @param[in] protocol family to use (IPv4 or IPv6 or 0). If 0 (zero) is passed in the protocol family will be automatically determined from the address
- * @param[in] context application supplied, can be accessed via
- * qdpn_connector_context() @return a new connector
- * to the given remote, or NULL on error.
- */
-qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver,
- const char *host,
- const char *port,
- const char *protocol_family,
- void* context);
-
-/** Access the head connector for a driver.
- *
- * @param[in] driver the driver whose head connector will be returned
- *
- * @return the head connector for driver or NULL if there is none
- */
-qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver);
-
-/** Access the next connector.
- *
- * @param[in] connector the connector whose next connector will be
- * returned
- *
- * @return the next connector
- */
-qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector);
-
-/** Service the given connector.
- *
- * Handle any inbound data, outbound data, or timing events pending on
- * the connector.
- *
- * @param[in] connector the connector to process.
- */
-void qdpn_connector_process(qdpn_connector_t *connector);
-
-/** Access the listener which opened this connector.
- *
- * @param[in] connector connector whose listener will be returned.
- * @return the listener which created this connector, or NULL if the
- * connector has no listener (e.g. an outbound client
- * connection)
- */
-qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *connector);
-
-/** Access the Authentication and Security context of the connector.
- *
- * @param[in] connector connector whose security context will be
- * returned
- * @return the Authentication and Security context for the connector,
- * or NULL if none
- */
-pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *connector);
-
-/** Access the AMQP Connection associated with the connector.
- *
- * @param[in] connector the connector whose connection will be
- * returned
- * @return the connection context for the connector, or NULL if none
- */
-pn_connection_t *qdpn_connector_connection(qdpn_connector_t *connector);
-
-/** Assign the AMQP Connection associated with the connector.
- *
- * @param[in] connector the connector whose connection will be set.
- * @param[in] connection the connection to associate with the
- * connector
- */
-void qdpn_connector_set_connection(qdpn_connector_t *connector, pn_connection_t *connection);
-
-/** Access the application context that is associated with the
- * connector.
- *
- * @param[in] connector the connector whose context is to be returned.
- * @return the application context that was passed to qdpn_connector()
- * or qdpn_connector_fd()
- */
-void *qdpn_connector_context(qdpn_connector_t *connector);
-
-/** Assign a new application context to the connector.
- *
- * @param[in] connector the connector which will hold the context.
- * @param[in] context new application context to associate with the
- * connector
- */
-void qdpn_connector_set_context(qdpn_connector_t *connector, void *context);
-
-/** Access the name of the connector
- *
- * @param[in] connector the connector of interest
- * @return the name of the connector in the form of a null-terminated character string.
- */
-const char *qdpn_connector_name(const qdpn_connector_t *connector);
-
-/** Access the numeric host ip of the connector
- *
- * @param[in] connector the connector of interest
- * @return the numeric host ip address of the connector in the form of a null-terminated character string.
- */
-const char *qdpn_connector_hostip(const qdpn_connector_t *connector);
-
-/** Access the transport used by this connector.
- *
- * @param[in] connector connector whose transport will be returned
- * @return the transport, or NULL if none
- */
-pn_transport_t *qdpn_connector_transport(qdpn_connector_t *connector);
-
-/** Close the socket used by the connector.
- *
- * @param[in] connector the connector whose socket will be closed
- */
-void qdpn_connector_close(qdpn_connector_t *connector);
-
-/** Call when the socket is already closed, an the connector needs updating.
- *
- * @param[in] connector the connector whose socket has been closed
- */
-void qdpn_connector_after_close(qdpn_connector_t *connector);
-
-
-/** Socket has been closed externally, mark it closed.
- *
- * @param[in] connector the connector whose socket will be closed
- */
-void qdpn_connector_mark_closed(qdpn_connector_t *connector);
-
-/** Determine if the connector is closed.
- *
- * @return True if closed, otherwise false
- */
-bool qdpn_connector_closed(qdpn_connector_t *connector);
-
-bool qdpn_connector_failed(qdpn_connector_t *connector);
-
-
-/** Destructor for the given connector.
- *
- * Assumes the connector's socket has been closed prior to call.
- *
- * @param[in] connector the connector object to free. No longer
- * valid on return
- */
-void qdpn_connector_free(qdpn_connector_t *connector);
-
-/** Activate a connector when a criteria is met
- *
- * Set a criteria for a connector (i.e. it's transport is writable) that, once met,
- * the connector shall be placed in the driver's work queue.
- *
- * @param[in] connector The connector object to activate
- * @param[in] criteria The criteria that must be met prior to activating the connector
- */
-void qdpn_connector_activate(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
-
-/** Activate all of the open file descriptors
- */
-void qdpn_activate_all(qdpn_driver_t *driver);
-
-/** Return the activation status of the connector for a criteria
- *
- * Return the activation status (i.e. readable, writable) for the connector. This function
- * has the side-effect of canceling the activation of the criteria.
- *
- * Please note that this function must not be used for normal AMQP connectors. It is only
- * used for connectors created so the driver can track non-AMQP file descriptors. Such
- * connectors are never passed into qdpn_connector_process.
- *
- * @param[in] connector The connector object to activate
- * @param[in] criteria The criteria to test. "Is this the reason the connector appeared
- * in the work list?"
- * @return true iff the criteria is activated on the connector.
- */
-bool qdpn_connector_activated(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
-
-/** True if the connector has received a hangup */
-bool qdpn_connector_hangup(qdpn_connector_t *connector);
-
-/** Create a listener using the existing file descriptor.
- *
- * @param[in] driver driver that will 'own' this listener
- * @param[in] fd existing socket for listener to listen on
- * @param[in] context application-supplied, can be accessed via
- * qdpn_listener_context()
- * @return a new listener on the given host:port, NULL if error
- */
-qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
-
-pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener);
-
-/** Create a connector using the existing file descriptor.
- *
- * @param[in] driver driver that will 'own' this connector.
- * @param[in] fd existing socket to use for this connector.
- * @param[in] context application-supplied, can be accessed via
- * qdpn_connector_context()
- * @return a new connector to the given host:port, NULL if error.
- */
-qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
-
-/** Get the file descriptor for this connector */
-int qdpn_connector_get_fd(qdpn_connector_t *connector);
-
-/** Set the wakeup time on the connector */
-void qdpn_connector_wakeup(qdpn_connector_t* c, pn_timestamp_t t);
-
-/** Current time according */
-pn_timestamp_t qdpn_now();
-
-/** Implementation of connector methods (e.g. these are different for HTTP connectors */
-typedef struct qdpn_connector_methods_t {
- void (*process)(qdpn_connector_t *c);
- void (*close)(qdpn_connector_t *c);
-} qdpn_connector_methods_t;
-
-/** Set new methods for a connector (e.g. because it is a HTTP connector) */
-void qdpn_connector_set_methods(qdpn_connector_t *c, qdpn_connector_methods_t *methods);
-
-/**@}*/
-
-#endif /* driver.h */
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/error.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/error.h b/include/qpid/dispatch/error.h
index 6bf7e82..f464e58 100644
--- a/include/qpid/dispatch/error.h
+++ b/include/qpid/dispatch/error.h
@@ -20,6 +20,7 @@
*/
#include <qpid/dispatch/enum.h>
+#include <stdarg.h>
/** @file
* Thread-safe error handling mechansim for dispatch.
@@ -59,7 +60,13 @@ ENUM_DECLARE(qd_error);
*/
#define qd_error(code, ...) qd_error_impl(code, __FILE__, __LINE__, __VA_ARGS__)
+/**
+ * Like qd_error but takes a va_list of format arguments
+ */
+#define qd_verror(code, fmt, ap) qd_error_vimpl(code, __FILE__, __LINE__, fmt, ap)
+
qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...);
+qd_error_t qd_error_vimpl(qd_error_t code, const char *file, int line, const char *fmt, va_list ap);
/**
* Clear thread-local error code and message.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 2f749b7..0031ed7 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -220,18 +220,17 @@ int qdr_connection_process(qdr_connection_t *conn);
/**
* qdr_connection_activate_t callback
*
- * Activate a connection for transmission (socket write). This is called whenever
- * the core has deliveries on links, disposition updates on deliveries, or flow updates
- * to be sent across the connection.
+ * Activate a connection with pending work from the core to ensure it will be processed by
+ * the proactor: the core has deliveries on links, disposition updates on deliveries, or
+ * flow updates to be sent across the connection.
*
* IMPORTANT: This function will be invoked on the core thread. It must never block,
* delay, or do any lenghty computation.
*
* @param context The context supplied when the callback was registered
* @param conn The connection object to be activated
- * @param awaken Iff true, awaken the driver poll loop after the activation
*/
-typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn, bool awaken);
+typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn);
/**
******************************************************************************
@@ -560,7 +559,7 @@ typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int l
typedef void (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled);
typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled);
-void qdr_connection_handlers(qdr_core_t *core,
+void qdr_connection_handlers(qdr_core_t *core,
void *context,
qdr_connection_activate_t activate,
qdr_link_first_attach_t first_attach,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/server.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index a466ec0..ec885ae 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -24,8 +24,10 @@
#include <proton/engine.h>
#include <proton/event.h>
+struct qd_container_t;
+
/**@file
- * Control server threads, signals and connections.
+ * Control server threads and connections.
*/
/**
@@ -52,9 +54,8 @@ typedef void (*qd_deferred_t)(void *context, bool discard);
* Run the server threads until completion - The blocking version.
*
* Start the operation of the server, including launching all of the worker
- * threads. This function does not return until after the server has been
- * stopped. The thread that calls qd_server_run is used as one of the worker
- * threads.
+ * threads. Returns when all server threads have exited. The thread that calls
+ * qd_server_run is used as one of the worker threads.
*
* @param qd The dispatch handle returned by qd_dispatch.
*/
@@ -62,103 +63,16 @@ void qd_server_run(qd_dispatch_t *qd);
/**
- * Start the server threads and return immediately - The non-blocking version.
+ * Tells the server to stop but doesn't wait for server to exit.
+ * The call to qd_server_run() will exit when all server threads have exited.
*
- * Start the operation of the server, including launching all of the worker
- * threads.
+ * May be called from any thread or from a signal handler.
*
* @param qd The dispatch handle returned by qd_dispatch.
*/
-void qd_server_start(qd_dispatch_t *qd);
-
-/**
- * Stop the server
- *
- * Stop the server and join all of its worker threads. This function may be
- * called from any thread. When this function returns, all of the other
- * server threads have been closed and joined. The calling thread will be the
- * only running thread in the process.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- */
void qd_server_stop(qd_dispatch_t *qd);
-
-/**
- * Pause (quiesce) the server.
- *
- * This call blocks until all of the worker threads (except the one calling
- * this function) are finished processing and have been blocked. When this
- * call returns, the calling thread is the only thread running in the process.
- *
- * If the calling process is *not* one of the server's worker threads, then
- * this function will block all of the worker threads before returning.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- */
-void qd_server_pause(qd_dispatch_t *qd);
-
-
-/**
- * Resume normal operation of a paused server.
- *
- * This call unblocks all of the worker threads so they can resume normal
- * connection processing.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- */
-void qd_server_resume(qd_dispatch_t *qd);
-
-
-/**
- * @}
- * @defgroup server_signal server_signal
- *
- * Server Signal Handling
- *
- * @{
- */
-
-
-/**
- * Signal Handler
- *
- * Callback for signal handling. This handler will be invoked on one of the
- * worker threads in an orderly fashion. This callback is triggered by a call
- * to qd_server_signal.
- *
- * @param context The handler context supplied in qd_server_initialize.
- * @param signum The signal number that was passed into qd_server_signal.
- */
-typedef void (*qd_signal_handler_cb_t)(void* context, int signum);
-
-
-/**
- * Set the signal handler for the server. The signal handler is invoked
- * cleanly on a worker thread after a call is made to qd_server_signal. The
- * signal handler is optional and need not be set.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- * @param signal_handler The signal handler called when a registered signal is caught.
- * @param context Opaque context to be passed back in the callback function.
- */
-void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t signal_handler, void *context);
-
-
-/**
- * Schedule the invocation of the Server's signal handler.
- *
- * This function is safe to call from any context, including an OS signal
- * handler or an Interrupt Service Routine. It schedules the orderly
- * invocation of the Server's signal handler on one of the worker threads.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- * @param signum The signal number... TODO
- */
-void qd_server_signal(qd_dispatch_t *qd, int signum);
-
-
/**
* @}
* @defgroup connection connection
@@ -459,76 +373,15 @@ typedef struct qd_server_config_t {
char *host_port;
/**
- * Set for listeners that are part of the initial router configuration.
- * An error in setting up initial listeners must shut down the router.
- */
- bool exit_on_error;
-
- /**
* @}
*/
} qd_server_config_t;
/**
- * Connection Event Handler
- *
- * Callback invoked when processing is needed on a proton connection. This
- * callback shall be invoked on one of the server's worker threads. The
- * server guarantees that no two threads shall be allowed to process a single
- * connection concurrently. The implementation of this handler may assume
- * that it has exclusive access to the connection and its subservient
- * components (sessions, links, deliveries, etc.).
- *
- * @param handler_context The handler context supplied in qd_server_set_conn_handler.
- * @param conn_context The handler context supplied in qd_server_{connect,listen}.
- * @param event The event/reason for the invocation of the handler.
- * @param conn The connection that requires processing by the handler.
- * @return A value greater than zero if the handler did any proton processing for
- * the connection. If no work was done, zero is returned.
- */
-typedef int (*qd_conn_handler_cb_t)(void *handler_context, void* conn_context, qd_conn_event_t event, qd_connection_t *conn);
-
-/**
- * Proton Event Handler
- *
- * This callback is invoked when proton events for a connection require
- * processing.
- *
- * @param handler_context The handler context supplied in qd_server_set_conn_handler.
- * @param conn_context The handler context supplied in qd_server_{connect,listen}.
- * @param event The proton event being raised.
- * @param conn The connection associated with this proton event.
+ * Set the container, must be set prior to the invocation of qd_server_run.
*/
-typedef int (*qd_pn_event_handler_cb_t)(void *handler_context, void* conn_context, pn_event_t *event, qd_connection_t *conn);
-
-
-/**
- * Post event process handler
- * Invoke only after all proton events have been popped from the collector.
- *
- * @param conn The connection for which all proton events have been popped.
- */
-typedef void (*qd_pn_event_complete_cb_t)(void *handler_context, qd_connection_t *conn);
-
-
-/**
- * Set the connection event handler callback.
- *
- * Set the connection handler callback for the server. This callback is
- * mandatory and must be set prior to the invocation of qd_server_run.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- * @param conn_handler The handler for processing connection-related events.
- * @param pn_event_handler The handler for proton events.
- * @param handler_context Context data to associate with the handler.
- */
-void qd_server_set_conn_handler(qd_dispatch_t *qd,
- qd_conn_handler_cb_t conn_handler,
- qd_pn_event_handler_cb_t pn_event_handler,
- qd_pn_event_complete_cb_t pn_event_complete_handler,
- void *handler_context);
-
+void qd_server_set_container(qd_dispatch_t *qd, struct qd_container_t *container);
/**
* Set the user context for a connection.
@@ -596,9 +449,8 @@ void qd_connection_set_user(qd_connection_t *conn);
* internal work list and be invoked for processing by a worker thread.
*
* @param conn The connection over which the application wishes to send data
- * @param awaken Iff true, wakeup the driver poll after the activation
*/
-void qd_server_activate(qd_connection_t *conn, bool awaken);
+void qd_server_activate(qd_connection_t *conn);
/**
@@ -620,15 +472,6 @@ bool qd_connection_inbound(qd_connection_t *conn);
/**
- * Get the event collector for a connection.
- *
- * @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
- * @return The pn_collector associated with the connection.
- */
-pn_collector_t *qd_connection_collector(qd_connection_t *conn);
-
-
-/**
* Get the connection id of a connection.
*
* @param conn Connection object supplied in QD_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
@@ -658,67 +501,14 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
/**
- * Write accessor to the connection's proton-event stall flag.
- * When set no further events are processed on this connection.
- * Used during processing of policy decisions to hold off incoming
- * pipeline of amqp events.
- *
- * @param conn Connection object
- * @param stall Value of stall flag
- */
-void qd_connection_set_event_stall(qd_connection_t *conn, bool stall);
-
-
-/**
- * Create a listener for incoming connections.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- * @param config Pointer to a configuration block for this listener. This block will be
- * referenced by the server, not copied. The referenced record must remain
- * in-scope for the life of the listener.
- * @param context User context passed back in the connection handler.
- * @return A pointer to the new listener, or NULL in case of failure.
- */
-qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context);
-
-
-/**
- * Free the resources associated with a listener.
- *
- * @param li A listener pointer returned by qd_listen.
+ * Listen for incoming connections, return true if listening succeeded.
*/
-void qd_server_listener_free(qd_listener_t* li);
-
+bool qd_listener_listen(qd_listener_t *l);
/**
- * Close a listener so it will accept no more connections.
- *
- * @param li A listener pointer returned by qd_listen.
+ * Initiate an outgoing connection. Returns true if successful.
*/
-void qd_server_listener_close(qd_listener_t* li);
-
-
-/**
- * Create a connector for an outgoing connection.
- *
- * @param qd The dispatch handle returned by qd_dispatch.
- * @param config Pointer to a configuration block for this connector. This block will be
- * referenced by the server, not copied. The referenced record must remain
- * in-scope for the life of the connector..
- * @param context User context passed back in the connection handler.
- * @return A pointer to the new connector, or NULL in case of failure.
- */
-qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context);
-
-
-/**
- * Free the resources associated with a connector.
- *
- * @param ct A connector pointer returned by qd_connect.
- */
-void qd_server_connector_free(qd_connector_t* ct);
-
-
+bool qd_connector_connect(qd_connector_t *ct);
/**
* Store address of display name service py object for C code use
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/include/qpid/dispatch/timer.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/timer.h b/include/qpid/dispatch/timer.h
index e5ba6ab..d0592d3 100644
--- a/include/qpid/dispatch/timer.h
+++ b/include/qpid/dispatch/timer.h
@@ -33,7 +33,10 @@
*/
typedef struct qd_timer_t qd_timer_t;
+/** Absolute time stamp, milliseconds since epoch */
typedef int64_t qd_timestamp_t;
+/** Relative duration in milliseconds */
+typedef int64_t qd_duration_t;
/**
* Timer Callback
@@ -77,7 +80,7 @@ void qd_timer_free(qd_timer_t *timer);
* @param msec The minimum number of milliseconds of delay until the timer fires.
* If 0 is supplied, the timer will be scheduled to fire immediately.
*/
-void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t msec);
+void qd_timer_schedule(qd_timer_t *timer, qd_duration_t msec);
/**
@@ -90,6 +93,11 @@ void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t msec);
void qd_timer_cancel(qd_timer_t *timer);
/**
+ * The current time.
+ */
+qd_timestamp_t qd_timer_now() ;
+
+/**
* @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 667c0ec..3f66abb 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -562,9 +562,9 @@
"operations": ["CREATE", "DELETE"],
"attributes": {
"host": {
- "description":"IP address: ipv4 or ipv6 literal or a host name",
+ "description":"A host name, IPV4 or IPV6 literal, or the empty string. The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.",
"type": "string",
- "default": "127.0.0.1",
+ "default": "",
"create": true
},
"port": {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/python/qpid_dispatch_internal/router/address.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/address.py b/python/qpid_dispatch_internal/router/address.py
index 4092ac7..0cff578 100644
--- a/python/qpid_dispatch_internal/router/address.py
+++ b/python/qpid_dispatch_internal/router/address.py
@@ -27,11 +27,6 @@ class Address(str):
Provides a central place for logic to construct addresses of various types.
"""
- # FIXME aconway 2015-02-06: not finished:
- # - Move to C, make accessible in C code also - provide python wrapper.
- # - Provide access to parts of address using C field iterator, avoid duplicating that logic
- # - (Maybe) separate address logic out of general field iterator logic for clarity.
-
AMQP="amqp:"
TOPO="_topo"
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/router/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/router/CMakeLists.txt b/router/CMakeLists.txt
index 5681e00..52e9ccc 100644
--- a/router/CMakeLists.txt
+++ b/router/CMakeLists.txt
@@ -35,6 +35,6 @@ set(router_SOURCES
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/${QPID_DISPATCH_HOME}")
add_executable(qdrouterd ${router_SOURCES})
-target_link_libraries(qdrouterd qpid-dispatch ${proton_lib})
+target_link_libraries(qdrouterd qpid-dispatch)
install(TARGETS qdrouterd RUNTIME DESTINATION sbin)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/router/src/main.c
----------------------------------------------------------------------
diff --git a/router/src/main.c b/router/src/main.c
index c09b0a8..ee084df 100644
--- a/router/src/main.c
+++ b/router/src/main.c
@@ -38,42 +38,21 @@ static const char* argv0 = 0;
/**
* This is the OS signal handler, invoked on an undetermined thread at a completely
- * arbitrary point of time. It is not safe to do anything here but signal the dispatch
- * server with the signal number.
+ * arbitrary point of time.
*/
static void signal_handler(int signum)
{
- qd_server_signal(dispatch, signum);
-}
-
-
-/**
- * This signal handler is called cleanly by one of the server's worker threads in
- * response to an earlier call to qd_server_signal.
- */
-static void server_signal_handler(void* context, int signum)
-{
- qd_server_pause(dispatch);
-
switch (signum) {
case SIGINT:
exit_with_sigint = 1;
// fallthrough
-
case SIGQUIT:
case SIGTERM:
- fflush(stdout);
- qd_server_stop(dispatch);
- break;
-
- case SIGHUP:
+ qd_server_stop(dispatch); /* qpid_server_stop is signal-safe */
break;
-
default:
break;
}
-
- qd_server_resume(dispatch);
}
static void check(int fd) {
@@ -109,9 +88,6 @@ static void main_process(const char *config_path, const char *python_pkgdir, int
qd_dispatch_load_config(dispatch, config_path);
check(fd);
- (void)server_signal_handler; (void)signal_handler;
- qd_server_set_signal_handler(dispatch, server_signal_handler, 0);
-
signal(SIGHUP, signal_handler);
signal(SIGQUIT, signal_handler);
signal(SIGTERM, signal_handler);
@@ -133,6 +109,7 @@ static void main_process(const char *config_path, const char *python_pkgdir, int
dispatch = NULL;
qd_dispatch_free(d);
+ fflush(stdout);
if (exit_with_sigint) {
signal(SIGINT, SIG_DFL);
kill(getpid(), SIGINT);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4c00206..17674d4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -62,7 +62,6 @@ set(qpid_dispatch_SOURCES
message.c
parse.c
policy.c
- posix/driver.c
posix/threading.c
python_embedded.c
router_agent.c
@@ -112,7 +111,7 @@ if (CMAKE_C_COMPILER_ID STREQUAL "GNU")
endif (CMAKE_C_COMPILER_ID STREQUAL "GNU")
add_library(qpid-dispatch SHARED ${qpid_dispatch_SOURCES})
-target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES})
+target_link_libraries(qpid-dispatch ${ProtonCore_LIBRARIES} ${ProtonProactor_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES})
set_target_properties(qpid-dispatch PROPERTIES
LINK_FLAGS "${CATCH_UNDEFINED}"
)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index 0c49a16..cf9f775 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -63,5 +63,3 @@ const char * const QD_AMQP_COND_PRECONDITION_FAILED = "amqp:precondition-failed"
const char * const QD_AMQP_COND_RESOURCE_DELETED = "amqp:resource-deleted";
const char * const QD_AMQP_COND_ILLEGAL_STATE = "amqp:illegal-state";
const char * const QD_AMQP_COND_FRAME_SIZE_TOO_SMALL = "amqp:frame-size-too-small";
-
-const char * const QD_COND_NAME = "router:error";
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/connection_manager.c
----------------------------------------------------------------------
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 8957185..4bc18ce 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -22,6 +22,7 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/failoverlist.h>
+#include <proton/listener.h>
#include "dispatch_private.h"
#include "connection_manager_private.h"
#include "server_private.h"
@@ -47,29 +48,13 @@ struct qd_config_ssl_profile_t {
char *ssl_private_key_file;
};
-struct qd_config_listener_t {
- qd_listener_t *listener;
- qd_server_config_t configuration;
- DEQ_LINKS(qd_config_listener_t);
-};
-
-DEQ_DECLARE(qd_config_listener_t, qd_config_listener_list_t);
DEQ_DECLARE(qd_config_ssl_profile_t, qd_config_ssl_profile_list_t);
-
-struct qd_config_connector_t {
- DEQ_LINKS(qd_config_connector_t);
- qd_connector_t *connector;
- qd_server_config_t configuration;
-};
-
-DEQ_DECLARE(qd_config_connector_t, qd_config_connector_list_t);
-
struct qd_connection_manager_t {
qd_log_source_t *log_source;
qd_server_t *server;
- qd_config_listener_list_t config_listeners;
- qd_config_connector_list_t config_connectors;
+ qd_listener_list_t listeners;
+ qd_connector_list_t connectors;
qd_config_ssl_profile_list_t config_ssl_profiles;
};
@@ -108,11 +93,12 @@ static qd_config_ssl_profile_t *qd_find_ssl_profile(qd_connection_manager_t *cm,
return 0;
}
-static void qd_server_config_free(qd_server_config_t *cf)
+void qd_server_config_free(qd_server_config_t *cf)
{
if (!cf) return;
free(cf->host);
free(cf->port);
+ free(cf->host_port);
free(cf->role);
if (cf->http_root) free(cf->http_root);
if (cf->name) free(cf->name);
@@ -199,6 +185,10 @@ static void set_config_host(qd_server_config_t *config, qd_entity_t* entity)
}
assert(config->host);
+
+ int hplen = strlen(config->host) + strlen(config->port) + 2;
+ config->host_port = malloc(hplen);
+ snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
}
@@ -402,6 +392,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t *conf
return qd_error_code();
}
+
bool is_log_component_enabled(qd_log_bits log_message, char *component_name) {
for(int i=0;;i++) {
@@ -507,45 +498,32 @@ static void log_config(qd_log_source_t *log, qd_server_config_t *c, const char *
}
-static void config_listener_free(qd_connection_manager_t *cm, qd_config_listener_t *cl)
-{
- if (cl->listener) {
- qd_server_listener_close(cl->listener);
- qd_server_listener_free(cl->listener);
- cl->listener = 0;
- }
- qd_server_config_free(&cl->configuration);
- free(cl);
-}
-
-
-qd_config_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
+qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
- qd_config_listener_t *cl = NEW(qd_config_listener_t);
- cl->listener = 0;
-
- if (load_server_config(qd, &cl->configuration, entity) != QD_ERROR_NONE) {
- qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config listener: %s", qd_error_message());
- config_listener_free(qd->connection_manager, cl);
+ qd_listener_t *li = qd_server_listener(qd->server);
+ if (!li || load_server_config(qd, &li->config, entity) != QD_ERROR_NONE) {
+ qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message());
+ qd_listener_decref(li);
return 0;
}
char *fol = qd_entity_opt_string(entity, "failoverList", 0);
if (fol) {
- cl->configuration.failover_list = qd_failover_list(fol);
+ li->config.failover_list = qd_failover_list(fol);
free(fol);
- if (cl->configuration.failover_list == 0) {
- qd_log(cm->log_source, QD_LOG_ERROR, "Error parsing failover list: %s", qd_error_message());
- config_listener_free(qd->connection_manager, cl);
+ if (li->config.failover_list == 0) {
+ qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create listener, bad failover list: %s",
+ qd_error_message());
+ qd_listener_decref(li);
return 0;
}
} else {
- cl->configuration.failover_list = 0;
+ li->config.failover_list = 0;
}
- DEQ_ITEM_INIT(cl);
- DEQ_INSERT_TAIL(cm->config_listeners, cl);
- log_config(cm->log_source, &cl->configuration, "Listener");
- return cl;
+ DEQ_ITEM_INIT(li);
+ DEQ_INSERT_TAIL(cm->listeners, li);
+ log_config(cm->log_source, &li->config, "Listener");
+ return li;
}
@@ -561,32 +539,19 @@ qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
}
-static void config_connector_free(qd_connection_manager_t *cm, qd_config_connector_t *cc)
+qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
- if (cc->connector) {
- qd_server_connector_free(cc->connector);
- qd_server_config_free(&cc->configuration);
- }
- free(cc);
-}
-
-
-qd_config_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
-{
- qd_error_clear();
qd_connection_manager_t *cm = qd->connection_manager;
- qd_config_connector_t *cc = NEW(qd_config_connector_t);
- ZERO(cc);
-
- if (load_server_config(qd, &cc->configuration, entity) != QD_ERROR_NONE) {
- qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create config connector: %s", qd_error_message());
- config_connector_free(qd->connection_manager, cc);
- return 0;
+ qd_connector_t *ct = qd_server_connector(qd->server);
+ if (ct && load_server_config(qd, &ct->config, entity) == QD_ERROR_NONE) {
+ DEQ_ITEM_INIT(ct);
+ DEQ_INSERT_TAIL(cm->connectors, ct);
+ log_config(cm->log_source, &ct->config, "Connector");
+ return ct;
}
- DEQ_ITEM_INIT(cc);
- DEQ_INSERT_TAIL(cm->config_connectors, cc);
- log_config(cm->log_source, &cc->configuration, "Connector");
- return cc;
+ qd_log(cm->log_source, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message());
+ qd_connector_decref(ct);
+ return 0;
}
@@ -598,8 +563,8 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)
cm->log_source = qd_log_source("CONN_MGR");
cm->server = qd->server;
- DEQ_INIT(cm->config_listeners);
- DEQ_INIT(cm->config_connectors);
+ DEQ_INIT(cm->listeners);
+ DEQ_INIT(cm->connectors);
DEQ_INIT(cm->config_ssl_profiles);
return cm;
@@ -609,18 +574,18 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)
void qd_connection_manager_free(qd_connection_manager_t *cm)
{
if (!cm) return;
- qd_config_listener_t *cl = DEQ_HEAD(cm->config_listeners);
- while (cl) {
- DEQ_REMOVE_HEAD(cm->config_listeners);
- config_listener_free(cm, cl);
- cl = DEQ_HEAD(cm->config_listeners);
+ qd_listener_t *li = DEQ_HEAD(cm->listeners);
+ while (li) {
+ DEQ_REMOVE_HEAD(cm->listeners);
+ qd_listener_decref(li);
+ li = DEQ_HEAD(cm->listeners);
}
- qd_config_connector_t *cc = DEQ_HEAD(cm->config_connectors);
- while (cc) {
- DEQ_REMOVE_HEAD(cm->config_connectors);
- config_connector_free(cm, cc);
- cc = DEQ_HEAD(cm->config_connectors);
+ qd_connector_t *c = DEQ_HEAD(cm->connectors);
+ while (c) {
+ DEQ_REMOVE_HEAD(cm->connectors);
+ qd_connector_decref(c);
+ c = DEQ_HEAD(cm->connectors);
}
qd_config_ssl_profile_t *sslp = DEQ_HEAD(cm->config_ssl_profiles);
@@ -637,25 +602,26 @@ void qd_connection_manager_free(qd_connection_manager_t *cm)
void qd_connection_manager_start(qd_dispatch_t *qd)
{
static bool first_start = true;
- qd_config_listener_t *cl = DEQ_HEAD(qd->connection_manager->config_listeners);
- qd_config_connector_t *cc = DEQ_HEAD(qd->connection_manager->config_connectors);
+ qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners);
+ qd_connector_t *ct = DEQ_HEAD(qd->connection_manager->connectors);
- while (cl) {
- if (cl->listener == 0 ) {
- cl->listener = qd_server_listen(qd, &cl->configuration, cl);
- if (!cl->listener && first_start) {
+ while (li) {
+ if (!li->pn_listener) {
+ qd_listener_listen(li);
+ if (!li->pn_listener && first_start) {
qd_log(qd->connection_manager->log_source, QD_LOG_CRITICAL,
- "Socket bind failed during initial configuration");
+ "Listen on %s failed during initial config", li->config.host_port);
exit(1);
+ } else {
+ li->exit_on_error = first_start;
}
}
- cl = DEQ_NEXT(cl);
+ li = DEQ_NEXT(li);
}
- while (cc) {
- if (cc->connector == 0)
- cc->connector = qd_server_connect(qd, &cc->configuration, cc);
- cc = DEQ_NEXT(cc);
+ while (ct) {
+ qd_connector_connect(ct);
+ ct = DEQ_NEXT(ct);
}
first_start = false;
@@ -664,12 +630,13 @@ void qd_connection_manager_start(qd_dispatch_t *qd)
void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *impl)
{
- qd_config_listener_t *cl = (qd_config_listener_t*) impl;
-
- if (cl) {
- qd_server_listener_close(cl->listener);
- DEQ_REMOVE(qd->connection_manager->config_listeners, cl);
- config_listener_free(qd->connection_manager, cl);
+ qd_listener_t *li = (qd_listener_t*) impl;
+ if (li) {
+ if (li->pn_listener) {
+ pn_listener_close(li->pn_listener);
+ }
+ DEQ_REMOVE(qd->connection_manager->listeners, li);
+ qd_listener_decref(li);
}
}
@@ -681,19 +648,30 @@ void qd_connection_manager_delete_ssl_profile(qd_dispatch_t *qd, void *impl)
}
+static void deferred_close(void *context, bool discard) {
+ if (!discard) {
+ pn_connection_close((pn_connection_t*)context);
+ }
+}
+
+
void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl)
{
- qd_config_connector_t *cc = (qd_config_connector_t*) impl;
-
- if (cc) {
- DEQ_REMOVE(qd->connection_manager->config_connectors, cc);
- config_connector_free(qd->connection_manager, cc);
+ qd_connector_t *ct = (qd_connector_t*) impl;
+ if (ct) {
+ sys_mutex_lock(ct->lock);
+ if (ct->ctx && ct->ctx->pn_conn) {
+ qd_connection_invoke_deferred(ct->ctx, deferred_close, ct->ctx->pn_conn);
+ }
+ sys_mutex_unlock(ct->lock);
+ DEQ_REMOVE(qd->connection_manager->connectors, ct);
+ qd_connector_decref(ct);
}
}
-const char *qd_config_connector_name(qd_config_connector_t *cc)
+const char *qd_connector_name(qd_connector_t *ct)
{
- return cc ? cc->configuration.name : 0;
+ return ct ? ct->config.name : 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 1fb83a6..ddc0418 100644
--- a/src/container.c
+++ b/src/container.c
@@ -106,7 +106,6 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
pn_link_close(pn_link);
return;
}
-
link->pn_sess = pn_link_session(pn_link);
link->pn_link = pn_link;
link->direction = QD_OUTGOING;
@@ -234,7 +233,7 @@ static void notify_opened(qd_container_t *container, qd_connection_t *conn, void
void policy_notify_opened(void *container, qd_connection_t *conn, void *context)
{
- notify_opened((qd_container_t *)container, (qd_connection_t *)conn, context);
+ notify_opened((qd_container_t *)container, (qd_connection_t *)conn, context);
}
static void notify_closed(qd_container_t *container, qd_connection_t *conn, void *context)
@@ -287,27 +286,21 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
}
-static int close_handler(qd_container_t *container, void* conn_context, pn_connection_t *conn, qd_connection_t* qd_conn)
+static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
//
close_links(container, conn, true);
-
- // close the connection
- pn_connection_close(conn);
-
- notify_closed(container, qd_conn, conn_context);
+ notify_closed(container, qd_conn, qd_connection_get_context(qd_conn));
return 0;
}
-static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
+static void writable_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
const qd_node_type_t *nt;
- int event_count = 0;
-
//
// Note the locking structure in this function. Generally this would be unsafe, but since
// this particular list is only ever appended to and never has items inserted or deleted,
@@ -320,14 +313,12 @@ static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd
while (nt_item) {
nt = nt_item->ntype;
if (nt->writable_handler)
- event_count += nt->writable_handler(nt->type_context, qd_conn, 0);
+ nt->writable_handler(nt->type_context, qd_conn, 0);
sys_mutex_lock(container->lock);
nt_item = DEQ_NEXT(nt_item);
sys_mutex_unlock(container->lock);
}
-
- return event_count;
}
/**
@@ -382,7 +373,18 @@ static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_ses
}
-void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn)
+
+/*
+ * FIXME aconway 2017-04-12: IMO this should not be necessary, we should
+ * be able to pn_*_free links and sessions directly the handler function.
+ * They will not actually be freed from memory till the event, connection,
+ * proactor etc. have all released their references.
+ *
+ * The need for these lists may indicate a router bug, where the router is
+ * using links/sessions after they are freed. Investigate and simplify if
+ * possible.
+ */
+static void conn_event_complete(qd_connection_t *qd_conn)
{
qd_pn_free_link_session_t *to_free_link = DEQ_HEAD(qd_conn->free_link_session_list);
qd_pn_free_link_session_t *to_free_session = DEQ_HEAD(qd_conn->free_link_session_list);
@@ -405,27 +407,38 @@ void pn_event_complete_handler(void *handler_context, qd_connection_t *qd_conn)
}
}
-int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *event, qd_connection_t *qd_conn)
+
+void qd_container_handle_event(qd_container_t *container, pn_event_t *event)
{
- qd_container_t *container = (qd_container_t*) handler_context;
- pn_connection_t *conn = qd_connection_pn(qd_conn);
- pn_session_t *ssn;
- pn_link_t *pn_link;
- qd_link_t *qd_link;
- pn_delivery_t *delivery;
+ pn_connection_t *conn = pn_event_connection(event);
+ qd_connection_t *qd_conn = conn ? pn_connection_get_context(conn) : NULL;
+ pn_session_t *ssn = NULL;
+ pn_link_t *pn_link = NULL;
+ qd_link_t *qd_link = NULL;
+ pn_delivery_t *delivery = NULL;
switch (pn_event_type(event)) {
+
case PN_CONNECTION_REMOTE_OPEN :
qd_connection_set_user(qd_conn);
if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
// This Open is an externally initiated connection
// Let policy engine decide
- qd_connection_set_event_stall(qd_conn, true);
+ /* TODO aconway 2017-04-11: presently the policy test is run
+ * in the current thread.
+ *
+ * If/when the policy test can run in another thread, the connection
+ * can be stalled by saving the current pn_event_batch and passing it
+ * to pn_proactor_done() when the policy check is complete. Note we
+ * can't run the policy check as a deferred function on the current
+ * connection since by stalling the current connection it will never be
+ * run, so we need some other thread context to run it in.
+ */
qd_conn->open_container = (void *)container;
- qd_connection_invoke_deferred(qd_conn, qd_policy_amqp_open, qd_conn);
+ qd_policy_amqp_open(qd_conn);
} else {
// This Open is in response to an internally initiated connection
- notify_opened(container, qd_conn, conn_context);
+ notify_opened(container, qd_conn, qd_connection_get_context(qd_conn));
}
break;
@@ -451,20 +464,21 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
}
}
break;
+
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
-
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (pn_link) {
- qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
- qd_link->pn_link = 0;
- pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
+ qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
+ qd_link->pn_link = 0;
+ pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_session_to_free_list(&qd_conn->free_link_session_list,ssn);
}
break;
+
case PN_SESSION_REMOTE_CLOSE :
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
ssn = pn_event_session(event);
@@ -600,28 +614,20 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even
}
break;
- default:
- break;
- }
- return 1;
-}
-
-
-static int handler(void *handler_context, void *conn_context, qd_conn_event_t event, qd_connection_t *qd_conn)
-{
- qd_container_t *container = (qd_container_t*) handler_context;
- pn_connection_t *conn = qd_connection_pn(qd_conn);
-
- switch (event) {
+ case PN_CONNECTION_WAKE:
+ writable_handler(container, conn, qd_conn);
+ break;
- case QD_CONN_EVENT_CLOSE:
- return close_handler(container, conn_context, conn, qd_conn);
+ case PN_TRANSPORT_CLOSED:
+ close_handler(container, conn, qd_conn);
+ break;
- case QD_CONN_EVENT_WRITABLE:
- return writable_handler(container, conn, qd_conn);
+ default:
+ break;
+ }
+ if (qd_conn) {
+ conn_event_complete(qd_conn);
}
-
- return 0;
}
@@ -639,8 +645,7 @@ qd_container_t *qd_container(qd_dispatch_t *qd)
DEQ_INIT(container->nodes);
DEQ_INIT(container->node_type_list);
- qd_server_set_conn_handler(qd, handler, pn_event_handler, pn_event_complete_handler, container);
-
+ qd_server_set_container(qd, container);
qd_log(container->log_source, QD_LOG_TRACE, "Container Initialized");
return container;
}
@@ -801,6 +806,9 @@ qd_lifetime_policy_t qd_container_node_get_life_policy(const qd_node_t *node)
qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, const char* name)
{
qd_link_t *link = new_qd_link_t();
+ if (!link) {
+ return NULL;
+ }
const qd_server_config_t * cf = qd_connection_config(conn);
link->pn_sess = pn_session(qd_connection_pn(conn));
@@ -932,7 +940,7 @@ void qd_link_activate(qd_link_t *link)
if (!ctx)
return;
- qd_server_activate(ctx, true);
+ qd_server_activate(ctx);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/error.c
----------------------------------------------------------------------
diff --git a/src/error.c b/src/error.c
index 6b7239a..b837a65 100644
--- a/src/error.c
+++ b/src/error.c
@@ -64,7 +64,7 @@ void qd_error_initialize() {
log_source = qd_log_source("ERROR");
}
-qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...) {
+qd_error_t qd_error_vimpl(qd_error_t code, const char *file, int line, const char *fmt, va_list ap) {
ts.error_code = code;
if (code) {
char *begin = ts.error_message;
@@ -75,10 +75,7 @@ qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char
aprintf(&begin, end, "%s: ", name);
else
aprintf(&begin, end, "%d: ", code);
- va_list arglist;
- va_start(arglist, fmt);
- vaprintf(&begin, end, fmt, arglist);
- va_end(arglist);
+ vaprintf(&begin, end, fmt, ap);
// NOTE: Use the file/line from the qd_error macro, not this line in error.c
qd_log_impl(log_source, QD_LOG_ERROR, file, line, "%s", qd_error_message());
return code;
@@ -88,6 +85,14 @@ qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char
return 0;
}
+qd_error_t qd_error_impl(qd_error_t code, const char *file, int line, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ qd_error_t err = qd_error_vimpl(code, file, line, fmt, ap);
+ va_end(ap);
+ return err;
+}
+
qd_error_t qd_error_clear() {
ts.error_code = 0;
snprintf(ts.error_message, ERROR_MAX, "No Error");
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/http-none.c
----------------------------------------------------------------------
diff --git a/src/http-none.c b/src/http-none.c
index b9af9e1..a8953e5 100644
--- a/src/http-none.c
+++ b/src/http-none.c
@@ -18,7 +18,6 @@
*/
#include <qpid/dispatch/log.h>
-#include <qpid/dispatch/driver.h>
#include "http.h"
/* No HTTP implementation available. */
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/policy.c
----------------------------------------------------------------------
diff --git a/src/policy.c b/src/policy.c
index 39ead3c..a55f245 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -184,9 +184,8 @@ qd_error_t qd_entity_refresh_policy(qd_entity_t* entity, void *unused) {
// error conditions.
//
-bool qd_policy_socket_accept(void *context, const char *hostname)
+bool qd_policy_socket_accept(qd_policy_t *policy, const char *hostname)
{
- qd_policy_t *policy = (qd_policy_t *)context;
bool result = true;
if (n_connections < policy->max_connection_limit) {
// connection counted and allowed
@@ -205,10 +204,8 @@ bool qd_policy_socket_accept(void *context, const char *hostname)
//
//
-void qd_policy_socket_close(void *context, const qd_connection_t *conn)
+void qd_policy_socket_close(qd_policy_t *policy, const qd_connection_t *conn)
{
- qd_policy_t *policy = (qd_policy_t *)context;
-
n_connections -= 1;
assert (n_connections >= 0);
if (policy->enableVhostPolicy) {
@@ -567,8 +564,7 @@ bool _qd_policy_approve_link_name(const char *username, const char *allowed, con
return result;
}
-//
-//
+
bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn)
{
const char *hostip = qd_connection_hostip(qd_conn);
@@ -678,55 +674,48 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
}
-//
-//
-void qd_policy_amqp_open(void *context, bool discard)
-{
- qd_connection_t *qd_conn = (qd_connection_t *)context;
- if (!discard) {
- pn_connection_t *conn = qd_connection_pn(qd_conn);
- qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server);
- qd_policy_t *policy = qd->policy;
- bool connection_allowed = true;
-
- if (policy->enableVhostPolicy) {
- // Open connection or not based on policy.
- pn_transport_t *pn_trans = pn_connection_transport(conn);
- const char *hostip = qd_connection_hostip(qd_conn);
- const char *pcrh = pn_connection_remote_hostname(conn);
- const char *vhost = (pcrh ? pcrh : "");
- const char *conn_name = qd_connection_name(qd_conn);
+void qd_policy_amqp_open(qd_connection_t *qd_conn) {
+ pn_connection_t *conn = qd_connection_pn(qd_conn);
+ qd_dispatch_t *qd = qd_server_dispatch(qd_conn->server);
+ qd_policy_t *policy = qd->policy;
+ bool connection_allowed = true;
+
+ if (policy->enableVhostPolicy) {
+ // Open connection or not based on policy.
+ pn_transport_t *pn_trans = pn_connection_transport(conn);
+ const char *hostip = qd_connection_hostip(qd_conn);
+ const char *pcrh = pn_connection_remote_hostname(conn);
+ const char *vhost = (pcrh ? pcrh : "");
+ const char *conn_name = qd_connection_name(qd_conn);
#define SETTINGS_NAME_SIZE 256
- char settings_name[SETTINGS_NAME_SIZE];
- uint32_t conn_id = qd_conn->connection_id;
- qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings
- memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t));
-
- if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, vhost, conn_name,
- settings_name, SETTINGS_NAME_SIZE, conn_id,
- qd_conn->policy_settings) &&
- settings_name[0]) {
- // This connection is allowed by policy.
- // Apply transport policy settings
- if (qd_conn->policy_settings->maxFrameSize > 0)
- pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
- if (qd_conn->policy_settings->maxSessions > 0)
- pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1);
- } else {
- // This connection is denied by policy.
- connection_allowed = false;
- }
+ char settings_name[SETTINGS_NAME_SIZE];
+ uint32_t conn_id = qd_conn->connection_id;
+ qd_conn->policy_settings = NEW(qd_policy_settings_t); // TODO: memory pool for settings
+ memset(qd_conn->policy_settings, 0, sizeof(qd_policy_settings_t));
+
+ if (qd_policy_open_lookup_user(policy, qd_conn->user_id, hostip, vhost, conn_name,
+ settings_name, SETTINGS_NAME_SIZE, conn_id,
+ qd_conn->policy_settings) &&
+ settings_name[0]) {
+ // This connection is allowed by policy.
+ // Apply transport policy settings
+ if (qd_conn->policy_settings->maxFrameSize > 0)
+ pn_transport_set_max_frame(pn_trans, qd_conn->policy_settings->maxFrameSize);
+ if (qd_conn->policy_settings->maxSessions > 0)
+ pn_transport_set_channel_max(pn_trans, qd_conn->policy_settings->maxSessions - 1);
} else {
- // No policy implies automatic policy allow
- // Note that connections not governed by policy have no policy_settings.
- }
- if (connection_allowed) {
- if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
- pn_connection_open(conn);
- policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context);
- } else {
- qd_policy_private_deny_amqp_connection(conn, QD_AMQP_COND_RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
+ // This connection is denied by policy.
+ connection_allowed = false;
}
+ } else {
+ // No policy implies automatic policy allow
+ // Note that connections not governed by policy have no policy_settings.
+ }
+ if (connection_allowed) {
+ if (pn_connection_state(conn) & PN_LOCAL_UNINIT)
+ pn_connection_open(conn);
+ policy_notify_opened(qd_conn->open_container, qd_conn, qd_conn->context);
+ } else {
+ qd_policy_private_deny_amqp_connection(conn, QD_AMQP_COND_RESOURCE_LIMIT_EXCEEDED, CONNECTION_DISALLOWED);
}
- qd_connection_set_event_stall(qd_conn, false);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/policy.h
----------------------------------------------------------------------
diff --git a/src/policy.h b/src/policy.h
index c89ca2b..23f3610 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -99,7 +99,7 @@ qd_error_t qd_policy_c_counts_refresh(long ccounts, qd_entity_t*entity);
* @param[in] name the connector name
* @return the connection is allowed or not
**/
-bool qd_policy_socket_accept(void *context, const char *hostname);
+bool qd_policy_socket_accept(qd_policy_t *context, const char *hostname);
/** Record a closing connection.
@@ -109,7 +109,7 @@ bool qd_policy_socket_accept(void *context, const char *hostname);
* @param[in] context the current policy
* @param[in] conn qd_connection
**/
-void qd_policy_socket_close(void *context, const qd_connection_t *conn);
+void qd_policy_socket_close(qd_policy_t *context, const qd_connection_t *conn);
/** Approve a new session based on connection's policy.
@@ -153,10 +153,7 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q
* allowed to make this connection.
* Denied pn_connections are closed with a condition.
* Allowed connections are signaled through qd_connection_manager.
- * This function is called from the deferred queue.
- * @param[in] context a qd_connection_t object
- * @param[in] discard callback switch
**/
-void qd_policy_amqp_open(void *context, bool discard);
+void qd_policy_amqp_open(qd_connection_t *conn);
#endif
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org