You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/27 17:38:28 UTC
[03/10] qpid-dispatch git commit: DISPATCH-390: Convert dispatch to
use pn_proactor_t
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 848cfcb..15a2aef 100644
--- a/src/server.c
+++ b/src/server.c
@@ -24,6 +24,12 @@
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/failoverlist.h>
+
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+
#include "qpid/dispatch/python_embedded.h"
#include "entity.h"
#include "entity_cache.h"
@@ -34,94 +40,33 @@
#include "alloc.h"
#include "config.h"
#include <stdio.h>
-#include <time.h>
#include <string.h>
#include <errno.h>
#include <inttypes.h>
-typedef struct qd_thread_t {
- qd_server_t *qd_server;
- int thread_id;
- volatile int running;
- volatile int canceled;
- int using_thread;
- sys_thread_t *thread;
-} qd_thread_t;
-
-
-typedef struct qd_work_item_t {
- DEQ_LINKS(struct qd_work_item_t);
- qdpn_connector_t *cxtr;
-} qd_work_item_t;
-
-DEQ_DECLARE(qd_work_item_t, qd_work_list_t);
-
-
struct qd_server_t {
qd_dispatch_t *qd;
- int thread_count;
+ const int thread_count; /* Immutable */
const char *container_name;
const char *sasl_config_path;
const char *sasl_config_name;
- qdpn_driver_t *driver;
+ pn_proactor_t *proactor;
+ qd_container_t *container;
qd_log_source_t *log_source;
- qd_conn_handler_cb_t conn_handler;
- qd_pn_event_handler_cb_t pn_event_handler;
- qd_pn_event_complete_cb_t pn_event_complete_handler;
void *start_context;
- void *conn_handler_context;
sys_cond_t *cond;
sys_mutex_t *lock;
- qd_thread_t **threads;
- qd_work_list_t work_queue;
- qd_timer_list_t pending_timers;
- bool a_thread_is_waiting;
int pause_requests;
int threads_paused;
int pause_next_sequence;
int pause_now_serving;
- qd_signal_handler_cb_t signal_handler;
- bool signal_handler_running;
- void *signal_context;
- int pending_signal;
- qd_timer_t *heartbeat_timer;
uint64_t next_connection_id;
void *py_displayname_obj;
qd_http_server_t *http;
};
-/**
- * Listener objects represent the desire to accept incoming transport connections.
- */
-struct qd_listener_t {
- qd_server_t *server;
- const qd_server_config_t *config;
- void *context;
- qdpn_listener_t *pn_listener;
- qd_http_listener_t *http;
-};
-
-
-/**
- * Connector objects represent the desire to create and maintain an outgoing transport connection.
- */
-struct qd_connector_t {
- qd_server_t *server;
- cxtr_state_t state;
- const qd_server_config_t *config;
- void *context;
- qd_connection_t *ctx;
- qd_timer_t *timer;
- long delay;
-};
-
-
-
-static __thread qd_server_t *thread_server = 0;
-
#define HEARTBEAT_INTERVAL 1000
-ALLOC_DEFINE(qd_work_item_t);
ALLOC_DEFINE(qd_listener_t);
ALLOC_DEFINE(qd_connector_t);
ALLOC_DEFINE(qd_deferred_call_t);
@@ -142,49 +87,32 @@ const char CERT_FINGERPRINT_SHA256 = '2';
const char CERT_FINGERPRINT_SHA512 = '5';
char *COMPONENT_SEPARATOR = ";";
-static void setup_ssl_sasl_and_open(qd_connection_t *ctx);
+static const int BACKLOG = 50; /* Listening backlog */
-static qd_thread_t *thread(qd_server_t *qd_server, int id)
-{
- qd_thread_t *thread = NEW(qd_thread_t);
- if (!thread)
- return 0;
-
- thread->qd_server = qd_server;
- thread->thread_id = id;
- thread->running = 0;
- thread->canceled = 0;
- thread->using_thread = 0;
-
- return thread;
-}
+static void setup_ssl_sasl_and_open(qd_connection_t *ctx);
-static void free_qd_connection(qd_connection_t *ctx)
-{
- if (ctx->policy_settings) {
- if (ctx->policy_settings->sources)
- free(ctx->policy_settings->sources);
- if (ctx->policy_settings->targets)
- free(ctx->policy_settings->targets);
- free (ctx->policy_settings);
- ctx->policy_settings = 0;
- }
- if (ctx->pn_conn) {
- pn_connection_set_context(ctx->pn_conn, 0);
- pn_decref(ctx->pn_conn);
- ctx->pn_conn = NULL;
- }
- if (ctx->collector) {
- pn_collector_free(ctx->collector);
- ctx->collector = NULL;
+/* Construct a new qd_connectoin. */
+static qd_connection_t *qd_connection(qd_server_t *server, const char *role) {
+ qd_connection_t *ctx = new_qd_connection_t();
+ if (!ctx) return NULL;
+ ZERO(ctx);
+ ctx->pn_conn = pn_connection();
+ ctx->deferred_call_lock = sys_mutex();
+ ctx->role = strdup(role);
+ if (!ctx->pn_conn || !ctx->deferred_call_lock || !role) {
+ if (ctx->pn_conn) pn_connection_free(ctx->pn_conn);
+ if (ctx->deferred_call_lock) sys_mutex_free(ctx->deferred_call_lock);
+ free(ctx->role);
+ return NULL;
}
-
- if (ctx->free_user_id)
- free((char*)ctx->user_id);
-
- free(ctx->role);
-
- free_qd_connection_t(ctx);
+ ctx->server = server;
+ DEQ_ITEM_INIT(ctx);
+ DEQ_INIT(ctx->deferred_calls);
+ DEQ_INIT(ctx->free_link_session_list);
+ sys_mutex_lock(server->lock);
+ ctx->connection_id = server->next_connection_id++;
+ sys_mutex_unlock(server->lock);
+ return ctx;
}
/**
@@ -222,7 +150,7 @@ qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *displayname
static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tport)
{
const qd_server_config_t *config =
- conn->connector ? conn->connector->config : conn->listener->config;
+ conn->connector ? &conn->connector->config : &conn->listener->config;
if (config->ssl_uid_format) {
// The ssl_uid_format length cannot be greater that 7
@@ -418,22 +346,6 @@ static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tpo
}
-/**
- * Allocate a new qd_connection
- * with DEQ items initialized, call lock allocated, and all other fields cleared.
- */
-static qd_connection_t *connection_allocate()
-{
- qd_connection_t *ctx = new_qd_connection_t();
- ZERO(ctx);
- DEQ_ITEM_INIT(ctx);
- DEQ_INIT(ctx->deferred_calls);
- ctx->deferred_call_lock = sys_mutex();
- DEQ_INIT(ctx->free_link_session_list);
- return ctx;
-}
-
-
void qd_connection_set_user(qd_connection_t *conn)
{
pn_transport_t *tport = pn_connection_transport(conn->pn_conn);
@@ -507,20 +419,6 @@ static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const qd_server_confi
return QD_ERROR_NONE;
}
-// Format the identity of an incoming connection to buf for logging
-static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
-{
- qd_listener_t *qd_listener = qdpn_listener_context(qdpn_connector_listener(cxtr));
- assert(qd_listener);
- const char *cname = qdpn_connector_name(cxtr);
- const char *host = qd_listener->config->host;
- const char *port = qd_listener->config->port;
- const char *protocol = qd_listener->config->http ? "HTTP" : "AMQP";
- snprintf(buf, size, "incoming %s connection from %s to %s:%s",
- protocol, cname, host, port);
- return buf;
-}
-
static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
{
@@ -602,637 +500,383 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c
}
-static void thread_process_listeners_LH(qd_server_t *qd_server)
+static void on_accept(pn_event_t *e)
{
- qdpn_driver_t *driver = qd_server->driver;
- qdpn_listener_t *listener;
- qdpn_connector_t *cxtr;
- qd_connection_t *ctx;
-
- for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
- qd_listener_t *li = qdpn_listener_context(listener);
- bool policy_counted = false;
- cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept, &policy_counted);
- if (!cxtr)
- continue;
-
- char logbuf[qd_log_max_len()];
-
- ctx = connection_allocate();
- ctx->server = qd_server;
- ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
- ctx->pn_cxtr = cxtr;
- ctx->listener = qdpn_listener_context(listener);
- ctx->context = ctx->listener->context;
- ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it
- ctx->policy_counted = policy_counted;
-
- // Copy the role from the listener config
- int role_length = strlen(ctx->listener->config->role) + 1;
- ctx->role = (char*) malloc(role_length);
- strcpy(ctx->role, ctx->listener->config->role);
-
- pn_connection_t *conn = pn_connection();
- ctx->collector = pn_collector();
- pn_connection_collect(conn, ctx->collector);
- decorate_connection(qd_server, conn, ctx->listener->config);
- qdpn_connector_set_connection(cxtr, conn);
- pn_connection_set_context(conn, ctx);
- ctx->pn_conn = conn;
- ctx->owner_thread = CONTEXT_NO_OWNER;
- qdpn_connector_set_context(cxtr, ctx);
-
- qd_log(qd_server->log_source, QD_LOG_INFO, "Accepting %s with connection id [%"PRIu64"]",
- log_incoming(logbuf, sizeof(logbuf), cxtr), ctx->connection_id);
+ assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
+ pn_listener_t *pn_listener = pn_event_listener(e);
+ qd_listener_t *listener = pn_listener_get_context(pn_listener);
+ qd_connection_t *ctx = qd_connection(listener->server, listener->config.role);
+ if (!ctx) {
+ qd_log(listener->server->log_source, QD_LOG_CRITICAL,
+ "Allocation failure during accept to %s", listener->config.host_port);
+ return;
+ }
+ pn_connection_set_context(ctx->pn_conn, ctx);
+ ctx->listener = listener;
+ decorate_connection(listener->server, ctx->pn_conn, &ctx->listener->config);
+ qd_log(listener->server->log_source, QD_LOG_TRACE,
+ "[%"PRIu64"] Accepting incoming connection from %s to %s",
+ ctx->connection_id, qd_connection_name(ctx), ctx->listener->config.host_port);
+ /* Asynchronous accept, configure the transport on PN_CONNECTION_BOUND */
+ pn_listener_accept(pn_listener, ctx->pn_conn);
+ }
+
+
+/* Log the description, set the transport condition (name, description) close the transport tail. */
+void connect_fail(qd_connection_t *ctx, const char *name, const char *description, ...)
+{
+ va_list ap;
+ va_start(ap, description);
+ qd_verror(QD_ERROR_RUNTIME, description, ap);
+ va_end(ap);
+ if (ctx->pn_conn) {
+ pn_transport_t *t = pn_connection_transport(ctx->pn_conn);
+ /* Normally this is closing the transport but if not bound close the connection. */
+ pn_condition_t *cond = t ? pn_transport_condition(t) : pn_connection_condition(ctx->pn_conn);
+ if (cond && !pn_condition_is_set(cond)) {
+ va_start(ap, description);
+ pn_condition_vformat(cond, name, description, ap);
+ va_end(ap);
+ }
+ if (t) {
+ pn_transport_close_tail(t);
+ } else {
+ pn_connection_close(ctx->pn_conn);
+ }
+ }
+}
- //
- // Get a pointer to the transport so we can insert security components into it
- //
- pn_transport_t *tport = qdpn_connector_transport(cxtr);
- const qd_server_config_t *config = ctx->listener->config;
- //
- // Configure the transport.
- //
+/* Get the host IP address for the remote end */
+static int set_remote_host_port(qd_connection_t *ctx) {
+ pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
+ const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
+ int err = 0;
+ if (!addr) {
+ err = -1;
+ qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
+ } else {
+ char rport[NI_MAXSERV] = "";
+ int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr),
+ ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (!err) {
+ snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport);
+ } else {
+ qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
+ }
+ }
+ return err;
+}
+
+
+/* Configure the transport once it is bound to the connection */
+static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
+ pn_connection_t *pn_conn = pn_event_connection(e);
+ qd_connection_t *ctx = pn_connection_get_context(pn_conn);
+ pn_transport_t *tport = pn_connection_transport(pn_conn);
+ pn_transport_set_context(tport, ctx); /* for transport_tracer */
+
+ //
+ // Proton pushes out its trace to transport_tracer() which in turn writes a trace
+ // message to the qdrouter log If trace level logging is enabled on the router set
+ // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
+ //
+ if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) {
+ pn_transport_trace(tport, PN_TRACE_FRM);
+ pn_transport_set_tracer(tport, transport_tracer);
+ }
+
+ const qd_server_config_t *config = NULL;
+ if (ctx->listener) { /* Accepting an incoming connection */
+ config = &ctx->listener->config;
+ const char *name = config->host_port;
pn_transport_set_server(tport);
- pn_transport_set_max_frame(tport, config->max_frame_size);
- pn_transport_set_channel_max(tport, config->max_sessions - 1);
- pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
- //
- // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log
- // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
- //
- pn_transport_set_context(tport, ctx);
- if (qd_log_enabled(qd_server->log_source, QD_LOG_TRACE)) {
- pn_transport_trace(tport, PN_TRACE_FRM);
- pn_transport_set_tracer(tport, transport_tracer);
+ if (set_remote_host_port(ctx) == 0 &&
+ qd_policy_socket_accept(server->qd->policy, ctx->rhost))
+ {
+ ctx->policy_counted = true;
+ } else {
+ pn_transport_close_tail(tport);
+ pn_transport_close_head(tport);
+ return;
}
- if (li->http) {
- // Set up HTTP if configured, HTTP provides its own SSL.
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring HTTP%s on %s",
- config->ssl_profile ? "S" : "",
- log_incoming(logbuf, sizeof(logbuf), cxtr));
- qd_http_listener_accept(li->http, cxtr);
- } else if (config->ssl_profile) {
- // Set up SSL if configured and HTTP is not providing SSL.
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s",
- log_incoming(logbuf, sizeof(logbuf), cxtr));
+ // Set up SSL
+ if (config->ssl_profile) {
+ qd_log(ctx->server->log_source, QD_LOG_TRACE, "Configuring SSL on %s", name);
if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) {
- qd_log(qd_server->log_source, QD_LOG_ERROR, "%s on %s",
- qd_error_message(), log_incoming(logbuf, sizeof(logbuf), cxtr));
- qdpn_connector_close(cxtr);
- continue;
+ connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "%s on %s", qd_error_message(), name);
+ return;
}
}
-
//
// Set up SASL
//
+ sys_mutex_lock(ctx->server->lock);
pn_sasl_t *sasl = pn_sasl(tport);
- if (qd_server->sasl_config_path)
- pn_sasl_config_path(sasl, qd_server->sasl_config_path);
- pn_sasl_config_name(sasl, qd_server->sasl_config_name);
+ if (ctx->server->sasl_config_path)
+ pn_sasl_config_path(sasl, ctx->server->sasl_config_path);
+ pn_sasl_config_name(sasl, ctx->server->sasl_config_name);
if (config->sasl_mechanisms)
pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
pn_transport_require_auth(tport, config->requireAuthentication);
pn_transport_require_encryption(tport, config->requireEncryption);
pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
+ sys_mutex_unlock(ctx->server->lock);
+
+ qd_log(ctx->server->log_source, QD_LOG_INFO, "Accepted connection to %s from %s",
+ name, ctx->rhost_port);
+ } else if (ctx->connector) { /* Establishing an outgoing connection */
+ config = &ctx->connector->config;
+ setup_ssl_sasl_and_open(ctx);
+ } else { /* No connector and no listener */
+ connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "unknown Connection");
+ return;
}
-}
-
-
-static void handle_signals_LH(qd_server_t *qd_server)
-{
- int signum = qd_server->pending_signal;
-
- if (signum && !qd_server->signal_handler_running) {
- qd_server->signal_handler_running = true;
- qd_server->pending_signal = 0;
- if (qd_server->signal_handler) {
- sys_mutex_unlock(qd_server->lock);
- qd_server->signal_handler(qd_server->signal_context, signum);
- sys_mutex_lock(qd_server->lock);
- }
- qd_server->signal_handler_running = false;
- }
-}
-
-static void block_if_paused_LH(qd_server_t *qd_server)
-{
- if (qd_server->pause_requests > 0) {
- qd_server->threads_paused++;
- sys_cond_signal_all(qd_server->cond);
- while (qd_server->pause_requests > 0)
- sys_cond_wait(qd_server->cond, qd_server->lock);
- qd_server->threads_paused--;
- }
+ //
+ // Common transport configuration.
+ //
+ pn_transport_set_max_frame(tport, config->max_frame_size);
+ pn_transport_set_channel_max(tport, config->max_sessions - 1);
+ pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
}
static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
{
- qd_deferred_call_list_t calls;
- qd_deferred_call_t *dc;
-
- //
- // Copy the deferred calls out of the connection under lock.
+ // Lock access to deferred_calls, other threads may concurrently add to it. Invoke
+ // the calls outside of the critical section.
//
- DEQ_INIT(calls);
sys_mutex_lock(conn->deferred_call_lock);
- dc = DEQ_HEAD(conn->deferred_calls);
- while (dc) {
+ qd_deferred_call_t *dc;
+ while ((dc = DEQ_HEAD(conn->deferred_calls))) {
DEQ_REMOVE_HEAD(conn->deferred_calls);
- DEQ_INSERT_TAIL(calls, dc);
- dc = DEQ_HEAD(conn->deferred_calls);
- }
- sys_mutex_unlock(conn->deferred_call_lock);
-
- //
- // Invoke the calls outside of the critical section.
- //
- dc = DEQ_HEAD(calls);
- while (dc) {
- DEQ_REMOVE_HEAD(calls);
+ sys_mutex_unlock(conn->deferred_call_lock);
dc->call(dc->context, discard);
free_qd_deferred_call_t(dc);
- dc = DEQ_HEAD(calls);
+ sys_mutex_lock(conn->deferred_call_lock);
}
+ sys_mutex_unlock(conn->deferred_call_lock);
}
-
-static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
-{
- qd_connection_t *ctx = qdpn_connector_context(cxtr);
- int events = 0;
- int passes = 0;
-
- if (ctx->closed)
- return 0;
-
- do {
- passes++;
-
- //
- // If this connection is outbound and is just now opening, do the initial SSL/SASL setup
- //
- if (!ctx->opened && !!ctx->connector && !qdpn_connector_closed(cxtr))
- setup_ssl_sasl_and_open(ctx);
-
- //
- // Step the engine for pre-handler processing
- //
- qdpn_connector_process(cxtr);
-
- //
- // If the connector has closed, notify the client via callback.
- //
- if (qdpn_connector_closed(cxtr)) {
- if (ctx->opened)
- qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
- QD_CONN_EVENT_CLOSE,
- (qd_connection_t*) qdpn_connector_context(cxtr));
- ctx->closed = true;
- events = 0;
- break;
- }
-
- invoke_deferred_calls(ctx, false);
-
- qd_connection_t *qd_conn = (qd_connection_t*) qdpn_connector_context(cxtr);
- pn_collector_t *collector = qd_connection_collector(qd_conn);
- pn_event_t *event;
-
- events = 0;
- if (!ctx->event_stall) {
- event = pn_collector_peek(collector);
- while (event) {
- //
- // If we are transitioning to the open state, notify the client via callback.
- //
- if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
- ctx->opened = true;
- if (ctx->connector) {
- ctx->connector->delay = 2000; // Delay on re-connect in case there is a recurring error
- } else
- assert(ctx->listener);
- events = 1;
- } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
- if (ctx->connector) {
- const qd_server_config_t *config = ctx->connector->config;
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
- }
- }
-
- events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
- pn_collector_pop(collector);
-
- event = ctx->event_stall ? 0 : pn_collector_peek(collector);
+void qd_container_handle_event(qd_container_t *container, pn_event_t *event);
+
+static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
+ qd_log_source_t *log = qd_server->log_source;
+
+ /* FIXME aconway 2017-02-20: HTTP support */
+ qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e));
+ const char *host_port = li->config.host_port;
+ switch (pn_event_type(e)) {
+
+ case PN_LISTENER_OPEN:
+ qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port);
+ break;
+
+ case PN_LISTENER_ACCEPT:
+ qd_log(log, QD_LOG_INFO, "Accepting connection on %s", host_port);
+ on_accept(e);
+ break;
+
+ case PN_LISTENER_CLOSE: {
+ pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+ if (pn_condition_is_set(cond)) {
+ qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+ pn_condition_get_description(cond),
+ pn_condition_get_name(cond));
+ if (li->exit_on_error) {
+ qd_log(log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s",
+ host_port);
+ exit(1);
}
-
- //
- // Free up any links and sessions that need to be freed since all the events have been popped from the collector.
- //
- qd_server->pn_event_complete_handler(qd_server->conn_handler_context, qd_conn);
- events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+ } else {
+ qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
}
- } while (events > 0);
-
- return passes > 1;
+ qd_listener_decref(li);
+ break;
+ }
+ default:
+ break;
+ }
}
-//
-// TEMPORARY FUNCTION PROTOTYPES
-//
-void qdpn_driver_wait_1(qdpn_driver_t *d);
-int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout);
-void qdpn_driver_wait_3(qdpn_driver_t *d);
-//
-// END TEMPORARY
-//
-
-static void *thread_run(void *arg)
+static void qd_connection_free(qd_connection_t *ctx)
{
- qd_thread_t *thread = (qd_thread_t*) arg;
- qd_work_item_t *work;
- qdpn_connector_t *cxtr;
- qd_connection_t *ctx;
- int error;
- int poll_result;
-
- if (!thread)
- return 0;
+ qd_server_t *qd_server = ctx->server;
- qd_server_t *qd_server = thread->qd_server;
- thread_server = qd_server;
- thread->running = 1;
+ qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx); /* Removed management entity */
- if (thread->canceled)
- return 0;
-
- //
- // Main Loop
- //
- while (thread->running) {
- sys_mutex_lock(qd_server->lock);
-
- //
- // Check for pending signals to process
- //
- handle_signals_LH(qd_server);
- if (!thread->running) {
- sys_mutex_unlock(qd_server->lock);
- break;
- }
-
- //
- // Check to see if the server is pausing. If so, block here.
- //
- block_if_paused_LH(qd_server);
- if (!thread->running) {
- sys_mutex_unlock(qd_server->lock);
- break;
- }
-
- //
- // Service pending timers.
- //
- qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers);
- if (timer) {
- DEQ_REMOVE_HEAD(qd_server->pending_timers);
-
- //
- // Mark the timer as idle in case it reschedules itself.
- //
- qd_timer_idle_LH(timer);
+ // If this is a dispatch connector, schedule the re-connect timer
+ if (ctx->connector) {
+ sys_mutex_lock(ctx->connector->lock);
+ ctx->connector->ctx = 0;
+ ctx->connector->state = CXTR_STATE_CONNECTING;
+ sys_mutex_unlock(ctx->connector->lock);
+ qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ }
- //
- // Release the lock and invoke the connection handler.
- //
- sys_mutex_unlock(qd_server->lock);
- timer->handler(timer->context);
- qdpn_driver_wakeup(qd_server->driver);
- continue;
- }
+ // If counted for policy enforcement, notify it has closed
+ sys_mutex_lock(qd_server->lock);
+ if (ctx->policy_counted) {
+ qd_policy_socket_close(qd_server->qd->policy, ctx);
+ }
+ sys_mutex_unlock(qd_server->lock);
- //
- // Check the work queue for connectors scheduled for processing.
- //
- work = DEQ_HEAD(qd_server->work_queue);
- if (!work) {
- //
- // There is no pending work to do
- //
- if (qd_server->a_thread_is_waiting) {
- //
- // Another thread is waiting on the proton driver, this thread must
- // wait on the condition variable until signaled.
- //
- sys_cond_wait(qd_server->cond, qd_server->lock);
- } else {
- //
- // This thread elects itself to wait on the proton driver. Set the
- // thread-is-waiting flag so other idle threads will not interfere.
- //
- qd_server->a_thread_is_waiting = true;
-
- //
- // Ask the timer module when its next timer is scheduled to fire. We'll
- // use this value in driver_wait as the timeout. If there are no scheduled
- // timers, the returned value will be -1.
- //
- qd_timestamp_t duration = qd_timer_next_duration_LH();
-
- //
- // Invoke the proton driver's wait sequence. This is a bit of a hack for now
- // and will be improved in the future. The wait process is divided into three parts,
- // the first and third of which need to be non-reentrant, and the second of which
- // must be reentrant (and blocks).
- //
- qdpn_driver_wait_1(qd_server->driver);
- sys_mutex_unlock(qd_server->lock);
-
- do {
- error = 0;
- poll_result = qdpn_driver_wait_2(qd_server->driver, duration);
- if (poll_result == -1)
- error = errno;
- } while (error == EINTR);
- if (error) {
- exit(-1);
- }
+ invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
+ if (ctx->deferred_call_lock)
+ sys_mutex_free(ctx->deferred_call_lock);
- sys_mutex_lock(qd_server->lock);
- qdpn_driver_wait_3(qd_server->driver);
+ if (ctx->policy_settings) {
+ if (ctx->policy_settings->sources)
+ free(ctx->policy_settings->sources);
+ if (ctx->policy_settings->targets)
+ free(ctx->policy_settings->targets);
+ free (ctx->policy_settings);
+ ctx->policy_settings = 0;
+ }
- if (!thread->running) {
- sys_mutex_unlock(qd_server->lock);
- break;
- }
+ if (ctx->free_user_id) free((char*)ctx->user_id);
+ free(ctx->role);
+ free_qd_connection_t(ctx);
- //
- // Visit the timer module.
- //
- struct timespec tv;
- clock_gettime(CLOCK_REALTIME, &tv);
- qd_timestamp_t milliseconds = ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000;
- qd_timer_visit_LH(milliseconds);
-
- //
- // Process listeners (incoming connections).
- //
- thread_process_listeners_LH(qd_server);
-
- //
- // Traverse the list of connectors-needing-service from the proton driver.
- // If the connector is not already in the work queue and it is not currently
- // being processed by another thread, put it in the work queue and signal the
- // condition variable.
- //
- cxtr = qdpn_driver_connector(qd_server->driver);
- while (cxtr) {
- ctx = qdpn_connector_context(cxtr);
- if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
- ctx->enqueued = 1;
- qd_work_item_t *workitem = new_qd_work_item_t();
- DEQ_ITEM_INIT(workitem);
- workitem->cxtr = cxtr;
- DEQ_INSERT_TAIL(qd_server->work_queue, workitem);
- sys_cond_signal(qd_server->cond);
- }
- cxtr = qdpn_driver_connector(qd_server->driver);
- }
+ /* Note: pn_conn is freed by the proactor */
+}
- //
- // Release our exclusive claim on qdpn_driver_wait.
- //
- qd_server->a_thread_is_waiting = false;
- }
- }
- //
- // If we were given a connector to work on from the work queue, mark it as
- // owned by this thread and as no longer enqueued.
- //
- cxtr = 0;
- if (work) {
- DEQ_REMOVE_HEAD(qd_server->work_queue);
- ctx = qdpn_connector_context(work->cxtr);
- if (ctx->owner_thread == CONTEXT_NO_OWNER) {
- ctx->owner_thread = thread->thread_id;
- ctx->enqueued = 0;
- cxtr = work->cxtr;
- free_qd_work_item_t(work);
- } else {
- //
- // This connector is being processed by another thread, re-queue it.
- //
- DEQ_INSERT_TAIL(qd_server->work_queue, work);
+/* Events involving a connection or listener are serialized by the proactor so
+ * only one event per connection / listener will be processed at a time.
+ */
+static bool handle(pn_event_t *e, qd_server_t *qd_server) {
+ pn_connection_t *pn_conn = pn_event_connection(e);
+ qd_connection_t *ctx = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL;
+
+ switch (pn_event_type(e)) {
+
+ case PN_PROACTOR_INTERRUPT:
+ /* Stop the current thread */
+ return false;
+
+ case PN_PROACTOR_TIMEOUT:
+ qd_timer_visit();
+ break;
+
+ case PN_LISTENER_OPEN:
+ case PN_LISTENER_ACCEPT:
+ case PN_LISTENER_CLOSE:
+ handle_listener(e, qd_server);
+ break;
+
+ case PN_CONNECTION_BOUND:
+ on_connection_bound(qd_server, e);
+ break;
+
+ case PN_CONNECTION_REMOTE_OPEN:
+ // If we are transitioning to the open state, notify the client via callback.
+ if (!ctx->opened) {
+ ctx->opened = true;
+ if (ctx->connector) {
+ ctx->connector->delay = 2000; // Delay re-connect in case there is a recurring error
}
}
- sys_mutex_unlock(qd_server->lock);
-
- //
- // Process the connector that we now have exclusive access to.
- //
- if (cxtr) {
- int work_done = 1;
-
- if (qdpn_connector_failed(cxtr))
- qdpn_connector_close(cxtr);
-
- //
- // Even if the connector has failed there are still events that
- // must be processed so that associated links will be cleaned up.
- //
- work_done = process_connector(qd_server, cxtr);
+ break;
- //
- // Check to see if the connector was closed during processing
- //
- if (qdpn_connector_closed(cxtr)) {
- qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx);
- //
- // Connector is closed. Free the context and the connector.
- // If this is a dispatch connector, schedule the re-connect timer
- //
- if (ctx->connector) {
- ctx->connector->ctx = 0;
- ctx->connector->state = CXTR_STATE_CONNECTING;
- qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
- }
-
- sys_mutex_lock(qd_server->lock);
-
- if (ctx->policy_counted) {
- qd_policy_socket_close(qd_server->qd->policy, ctx);
- }
-
- invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
- sys_mutex_free(ctx->deferred_call_lock);
- qdpn_connector_free(cxtr);
- free_qd_connection(ctx);
- sys_mutex_unlock(qd_server->lock);
- } else {
- //
- // The connector lives on. Mark it as no longer owned by this thread.
- //
- sys_mutex_lock(qd_server->lock);
- ctx->owner_thread = CONTEXT_NO_OWNER;
- sys_mutex_unlock(qd_server->lock);
- }
+ case PN_CONNECTION_WAKE:
+ invoke_deferred_calls(ctx, false);
+ break;
- //
- // Wake up the proton driver to force it to reconsider its set of FDs
- // in light of the processing that just occurred.
- //
- if (work_done)
- qdpn_driver_wakeup(qd_server->driver);
+ case PN_TRANSPORT_ERROR:
+ if (ctx && ctx->connector) { /* Outgoing connection */
+ const qd_server_config_t *config = &ctx->connector->config;
+ qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s failed", config->host_port);
}
- }
-
- return 0;
-}
-
-
-static void thread_start(qd_thread_t *thread)
-{
- if (!thread)
- return;
-
- thread->using_thread = 1;
- thread->thread = sys_thread(thread_run, (void*) thread);
-}
-
-
-static void thread_cancel(qd_thread_t *thread)
-{
- if (!thread)
- return;
-
- thread->running = 0;
- thread->canceled = 1;
-}
+ break;
+ default:
+ break;
+ } // Switch event type
-static void thread_join(qd_thread_t *thread)
-{
- if (!thread)
- return;
+ /* TODO aconway 2017-04-18: fold the container handler into the server */
+ qd_container_handle_event(qd_server->container, e);
- if (thread->using_thread) {
- sys_thread_join(thread->thread);
- sys_thread_free(thread->thread);
+ /* Free the connection after all other processing */
+ if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
+ pn_connection_set_context(pn_conn, NULL);
+ qd_connection_free(ctx);
}
+ return true;
}
-
-static void thread_free(qd_thread_t *thread)
+static void *thread_run(void *arg)
{
- if (!thread)
- return;
-
- free(thread);
+ qd_server_t *qd_server = (qd_server_t*)arg;
+ bool running = true;
+ while (running) {
+ pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
+ pn_event_t * e;
+ while (running && (e = pn_event_batch_next(events))) {
+ running = handle(e, qd_server);
+ }
+ pn_proactor_done(qd_server->proactor, events);
+ }
+ return NULL;
}
-static void cxtr_try_open(void *context)
+/* Timer callback to try/retry connection open */
+static void try_open_lh(qd_connector_t *ct)
{
- qd_connector_t *ct = (qd_connector_t*) context;
- if (ct->state != CXTR_STATE_CONNECTING)
+ if (ct->state != CXTR_STATE_CONNECTING) {
+ /* No longer referenced by pn_connection or timer */
+ qd_connector_decref(ct);
return;
+ }
- qd_connection_t *ctx = connection_allocate();
- ctx->server = ct->server;
- ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
- ctx->pn_conn = pn_connection();
- ctx->collector = pn_collector();
- ctx->connector = ct;
- ctx->context = ct->context;
-
- // Copy the role from the connector config
- int role_length = strlen(ctx->connector->config->role) + 1;
- ctx->role = (char*) malloc(role_length);
- strcpy(ctx->role, ctx->connector->config->role);
-
- qd_log(ct->server->log_source, QD_LOG_INFO, "Connecting to %s:%s", ct->config->host, ct->config->port);
-
- pn_connection_collect(ctx->pn_conn, ctx->collector);
- decorate_connection(ctx->server, ctx->pn_conn, ct->config);
-
- //
- // qdpn_connector is not thread safe
- //
- sys_mutex_lock(ct->server->lock);
- // Increment the connection id so the next connection can use it
- ctx->connection_id = ct->server->next_connection_id++;
- ctx->pn_cxtr = qdpn_connector(ct->server->driver, ct->config->host, ct->config->port, ct->config->protocol_family, (void*) ctx);
- sys_mutex_unlock(ct->server->lock);
-
- const qd_server_config_t *config = ct->config;
-
- if (ctx->pn_cxtr == 0) {
- sys_mutex_free(ctx->deferred_call_lock);
- free_qd_connection(ctx);
+ qd_connection_t *ctx = qd_connection(ct->server, ct->config.role);
+ if (!ctx) { /* Try again later */
+ qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s",
+ ct->config.host_port);
ct->delay = 10000;
qd_timer_schedule(ct->timer, ct->delay);
return;
}
+ ctx->connector = ct;
+ decorate_connection(ctx->server, ctx->pn_conn, &ct->config);
+ const qd_server_config_t *config = &ct->config;
//
- // Set the hostname on the pn_connection. This hostname will be used by proton as the hostname in the open frame.
+ // Set the hostname on the pn_connection. This hostname will be used by proton as the
+ // hostname in the open frame.
//
pn_connection_set_hostname(ctx->pn_conn, config->host);
- // Set the sasl user name and password on the proton connection object. This has to be done before the call to qdpn_connector_transport() which
- // binds the transport to the connection
+ // Set the sasl user name and password on the proton connection object. This has to be
+ // done before pn_proactor_connect which will bind a transport to the connection
if(config->sasl_username)
pn_connection_set_user(ctx->pn_conn, config->sasl_username);
if (config->sasl_password)
pn_connection_set_password(ctx->pn_conn, config->sasl_password);
- qdpn_connector_set_connection(ctx->pn_cxtr, ctx->pn_conn);
pn_connection_set_context(ctx->pn_conn, ctx);
ctx->connector->state = CXTR_STATE_OPEN;
-
ct->ctx = ctx;
ct->delay = 5000;
- //
- // Set up the transport, SASL, and SSL for the connection.
- //
- pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr);
-
- //
- // Configure the transport
- //
- pn_transport_set_max_frame(tport, config->max_frame_size);
- pn_transport_set_channel_max(tport, config->max_sessions - 1);
- pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
-
- //
- // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log
- //
- // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
- pn_transport_set_context(tport, ctx);
- if (qd_log_enabled(ct->server->log_source, QD_LOG_TRACE)) {
- pn_transport_trace(tport, PN_TRACE_FRM);
- pn_transport_set_tracer(tport, transport_tracer);
- }
-
- ctx->owner_thread = CONTEXT_NO_OWNER;
+ qd_log(ct->server->log_source, QD_LOG_TRACE,
+ "[%"PRIu64"] Connecting to %s", ctx->connection_id, config->host_port);
+ /* Note: the transport is configured in the PN_CONNECTION_BOUND event */
+ pn_proactor_connect(ct->server->proactor, ctx->pn_conn, config->host_port);
}
-
static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
{
qd_connector_t *ct = ctx->connector;
- const qd_server_config_t *config = ct->config;
- pn_transport_t *tport = qdpn_connector_transport(ctx->pn_cxtr);
+ const qd_server_config_t *config = &ct->config;
+ pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);
//
// Set up SSL if appropriate
@@ -1242,19 +886,16 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
if (!domain) {
qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to %s:%s",
- ct->config->host, ct->config->port);
- /* TODO aconway 2014-07-15: Close the connection, clean up. */
+ ct->config.host, ct->config.port);
return;
}
- /* TODO aconway 2014-07-15: error handling on all SSL calls. */
-
// set our trusted database for checking the peer's cert:
if (config->ssl_trusted_certificate_db) {
if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL CA configuration failed for %s:%s",
- ct->config->host, ct->config->port);
+ ct->config.host, ct->config.port);
}
}
// should we force the peer to provide a cert?
@@ -1267,7 +908,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
trusted)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL peer auth configuration failed for %s:%s",
- ct->config->host, ct->config->port);
+ config->host, config->port);
}
}
@@ -1279,7 +920,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
config->ssl_password)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL local configuration failed for %s:%s",
- ct->config->host, ct->config->port);
+ config->host, config->port);
}
}
@@ -1288,7 +929,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
if (pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, NULL)) {
qd_log(ct->server->log_source, QD_LOG_ERROR,
"SSL peer host name verification failed for %s:%s",
- ct->config->host, ct->config->port);
+ config->host, config->port);
}
}
@@ -1310,58 +951,46 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
pn_connection_open(ctx->pn_conn);
}
-
-static void heartbeat_cb(void *context)
-{
- qd_server_t *qd_server = (qd_server_t*) context;
- qdpn_activate_all(qd_server->driver);
- qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
+static void try_open_cb(void *context) {
+ qd_connector_t *ct = (qd_connector_t*) context;
+ sys_mutex_lock(ct->lock);
+ try_open_lh(ct);
+ sys_mutex_unlock(ct->lock);
}
qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *container_name,
const char *sasl_config_path, const char *sasl_config_name)
{
- int i;
-
+ /* Initialize const members, 0 initialize all others. */
+ qd_server_t tmp = { .thread_count = thread_count };
qd_server_t *qd_server = NEW(qd_server_t);
if (qd_server == 0)
return 0;
+ memcpy(qd_server, &tmp, sizeof(tmp));
qd_server->qd = qd;
qd_server->log_source = qd_log_source("SERVER");
- qd_server->thread_count = thread_count;
qd_server->container_name = container_name;
qd_server->sasl_config_path = sasl_config_path;
qd_server->sasl_config_name = sasl_config_name;
- qd_server->driver = qdpn_driver(qd_server->log_source);
- qd_server->conn_handler = 0;
- qd_server->pn_event_handler = 0;
- qd_server->signal_handler = 0;
+ qd_server->proactor = pn_proactor();
+ qd_server->container = 0;
qd_server->start_context = 0;
- qd_server->signal_context = 0;
qd_server->lock = sys_mutex();
qd_server->cond = sys_cond();
qd_timer_initialize(qd_server->lock);
- qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count);
- for (i = 0; i < thread_count; i++)
- qd_server->threads[i] = thread(qd_server, i);
-
- DEQ_INIT(qd_server->work_queue);
- DEQ_INIT(qd_server->pending_timers);
- qd_server->a_thread_is_waiting = false;
qd_server->pause_requests = 0;
qd_server->threads_paused = 0;
qd_server->pause_next_sequence = 0;
qd_server->pause_now_serving = 0;
- qd_server->pending_signal = 0;
- qd_server->signal_handler_running = false;
- qd_server->heartbeat_timer = 0;
qd_server->next_connection_id = 1;
qd_server->py_displayname_obj = 0;
- qd_server->http = qd_http_server(qd, qd_server->log_source);
+
+ /* FIXME aconway 2017-01-20: restore HTTP support */
+
qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
return qd_server;
@@ -1371,191 +1000,64 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
void qd_server_free(qd_server_t *qd_server)
{
if (!qd_server) return;
- for (int i = 0; i < qd_server->thread_count; i++)
- thread_free(qd_server->threads[i]);
qd_http_server_free(qd_server->http);
qd_timer_finalize();
- qdpn_driver_free(qd_server->driver);
+ pn_proactor_free(qd_server->proactor);
sys_mutex_free(qd_server->lock);
sys_cond_free(qd_server->cond);
- free(qd_server->threads);
Py_XDECREF((PyObject *)qd_server->py_displayname_obj);
free(qd_server);
}
-
-void qd_server_set_conn_handler(qd_dispatch_t *qd,
- qd_conn_handler_cb_t handler,
- qd_pn_event_handler_cb_t pn_event_handler,
- qd_pn_event_complete_cb_t pn_event_complete_handler,
- void *handler_context)
-{
- qd->server->conn_handler = handler;
- qd->server->pn_event_handler = pn_event_handler;
- qd->server->pn_event_complete_handler = pn_event_complete_handler;
- qd->server->conn_handler_context = handler_context;
-}
-
-
-void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context)
+void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container)
{
- qd->server->signal_handler = handler;
- qd->server->signal_context = context;
-}
-
-
-static void qd_server_announce(qd_server_t* qd_server)
-{
- qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count);
-#ifndef NDEBUG
- qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
-#endif
+ qd->server->container = container;
}
void qd_server_run(qd_dispatch_t *qd)
{
qd_server_t *qd_server = qd->server;
-
- int i;
- if (!qd_server)
- return;
-
- assert(qd_server->conn_handler); // Server can't run without a connection handler.
-
- for (i = 1; i < qd_server->thread_count; i++)
- thread_start(qd_server->threads[i]);
-
- qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server);
- qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
-
- qd_server_announce(qd_server);
-
- thread_run((void*) qd_server->threads[0]);
-
- for (i = 1; i < qd_server->thread_count; i++)
- thread_join(qd_server->threads[i]);
-
- for (i = 0; i < qd_server->thread_count; i++)
- qd_server->threads[i]->canceled = 0;
-
- qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
-}
-
-
-void qd_server_start(qd_dispatch_t *qd)
-{
- qd_server_t *qd_server = qd->server;
int i;
+ assert(qd_server);
+ assert(qd_server->container); // Server can't run without a container
+ qd_log(qd_server->log_source, QD_LOG_NOTICE, "Operational, %d Threads Running",
+ qd_server->thread_count);
+#ifndef NDEBUG
+ qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
+#endif
+ int n = qd_server->thread_count - 1; /* Start count-1 threads + use current thread */
+ sys_thread_t **threads = (sys_thread_t **)calloc(n, sizeof(sys_thread_t*));
+ for (i = 0; i < n; i++) {
+ threads[i] = sys_thread(thread_run, qd_server);
+ }
+ thread_run(qd_server); /* Use the current thread */
+ for (i = 0; i < n; i++) {
+ sys_thread_join(threads[i]);
+ sys_thread_free(threads[i]);
+ }
+ free(threads);
- if (!qd_server)
- return;
-
- assert(qd_server->conn_handler); // Server can't run without a connection handler.
-
- for (i = 0; i < qd_server->thread_count; i++)
- thread_start(qd_server->threads[i]);
-
- qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server);
- qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
-
- qd_server_announce(qd_server);
+ qd_log(qd_server->log_source, QD_LOG_NOTICE, "Shut Down");
}
void qd_server_stop(qd_dispatch_t *qd)
{
- qd_server_t *qd_server = qd->server;
- int idx;
-
- sys_mutex_lock(qd_server->lock);
- for (idx = 0; idx < qd_server->thread_count; idx++)
- thread_cancel(qd_server->threads[idx]);
- sys_cond_signal_all(qd_server->cond);
- qdpn_driver_wakeup(qd_server->driver);
- sys_mutex_unlock(qd_server->lock);
-
- if (thread_server != qd_server) {
- for (idx = 0; idx < qd_server->thread_count; idx++)
- thread_join(qd_server->threads[idx]);
- qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
+ /* Disconnect everything, interrupt threads */
+ pn_proactor_disconnect(qd->server->proactor, NULL);
+ for (int i = 0; i < qd->server->thread_count; i++) {
+ pn_proactor_interrupt(qd->server->proactor);
}
}
-
-void qd_server_signal(qd_dispatch_t *qd, int signum)
+void qd_server_activate(qd_connection_t *ctx)
{
- if (!qd)
+ if (!ctx || !ctx->pn_conn)
return;
-
- qd_server_t *qd_server = qd->server;
-
- qd_server->pending_signal = signum;
- sys_cond_signal_all(qd_server->cond);
- qdpn_driver_wakeup(qd_server->driver);
-}
-
-
-void qd_server_pause(qd_dispatch_t *qd)
-{
- qd_server_t *qd_server = qd->server;
-
- sys_mutex_lock(qd_server->lock);
-
- //
- // Bump the request count to stop all the threads.
- //
- qd_server->pause_requests++;
- int my_sequence = qd_server->pause_next_sequence++;
-
- //
- // Awaken all threads that are currently blocking.
- //
- sys_cond_signal_all(qd_server->cond);
- qdpn_driver_wakeup(qd_server->driver);
-
- //
- // Wait for the paused thread count plus the number of threads requesting a pause to equal
- // the total thread count. Also, don't exit the blocking loop until now_serving equals our
- // sequence number. This ensures that concurrent pausers don't run at the same time.
- //
- while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) ||
- (my_sequence != qd_server->pause_now_serving))
- sys_cond_wait(qd_server->cond, qd_server->lock);
-
- sys_mutex_unlock(qd_server->lock);
-}
-
-
-void qd_server_resume(qd_dispatch_t *qd)
-{
- qd_server_t *qd_server = qd->server;
-
- sys_mutex_lock(qd_server->lock);
- qd_server->pause_requests--;
- qd_server->pause_now_serving++;
- sys_cond_signal_all(qd_server->cond);
- sys_mutex_unlock(qd_server->lock);
+ pn_connection_wake(ctx->pn_conn);
}
-
-void qd_server_activate(qd_connection_t *ctx, bool awaken)
-{
- if (!ctx)
- return;
-
- qdpn_connector_t *ctor = ctx->pn_cxtr;
- if (!ctor)
- return;
-
- if (!qdpn_connector_closed(ctor)) {
- qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
- if (awaken)
- qdpn_driver_wakeup(ctx->server->driver);
- }
-}
-
-
void qd_connection_set_context(qd_connection_t *conn, void *context)
{
conn->user_context = context;
@@ -1598,11 +1100,6 @@ bool qd_connection_inbound(qd_connection_t *conn)
}
-pn_collector_t *qd_connection_collector(qd_connection_t *conn)
-{
- return conn->collector;
-}
-
uint64_t qd_connection_connection_id(qd_connection_t *conn)
{
return conn->connection_id;
@@ -1612,8 +1109,8 @@ uint64_t qd_connection_connection_id(qd_connection_t *conn)
const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
{
if (conn->listener)
- return conn->listener->config;
- return conn->connector->config;
+ return &conn->listener->config;
+ return &conn->connector->config;
}
@@ -1628,131 +1125,106 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
DEQ_INSERT_TAIL(conn->deferred_calls, dc);
sys_mutex_unlock(conn->deferred_call_lock);
- qd_server_activate(conn, true);
+ qd_server_activate(conn);
}
-void qd_connection_set_event_stall(qd_connection_t *conn, bool stall)
+qd_listener_t *qd_server_listener(qd_server_t *server)
{
- conn->event_stall = stall;
- if (!stall)
- qd_server_activate(conn, true);
-}
+ qd_listener_t *li = new_qd_listener_t();
+ if (!li) return 0;
+ ZERO(li);
-qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
-{
- qd_server_t *qd_server = qd->server;
- qd_listener_t *li = new_qd_listener_t();
-
- if (!li)
- return 0;
-
- li->server = qd_server;
- li->config = config;
- li->context = context;
+ sys_atomic_init(&li->ref_count, 1);
+ li->server = server;
li->http = NULL;
-
- if (config->http) {
- li->http = qd_http_listener(qd_server->http, config);
- if (!li->http) {
- free_qd_listener_t(li);
- qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start HTTP listener on %s:%s",
- config->host, config->port);
- return NULL;
- }
- }
-
- li->pn_listener = qdpn_listener(
- qd_server->driver, config->host, config->port, config->protocol_family, li);
-
- if (!li->pn_listener) {
- free_qd_listener_t(li);
- qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
- config->host, config->port);
- return NULL;
- }
- qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s%s", config->host, config->port,
- config->http ? (config->ssl_profile ? "(HTTPS)":"(HTTP)") : "");
-
return li;
}
-void qd_server_listener_free(qd_listener_t* li)
-{
- if (!li)
- return;
- if (li->http) qd_http_listener_free(li->http);
- qdpn_listener_free(li->pn_listener);
- free_qd_listener_t(li);
+bool qd_listener_listen(qd_listener_t *li) {
+ if (!li->pn_listener) { /* Not already listening */
+ li->pn_listener = pn_listener();
+ if (!li->pn_listener) {
+ qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s",
+ li->config.host_port);
+ return false;
+ }
+ pn_listener_set_context(li->pn_listener, li);
+ /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */
+ sys_atomic_inc(&li->ref_count);
+ pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG);
+ }
+ return true;
}
-void qd_server_listener_close(qd_listener_t* li)
+void qd_listener_decref(qd_listener_t* li)
{
- if (li)
- qdpn_listener_close(li->pn_listener);
+ if (li && sys_atomic_dec(&li->ref_count) == 1) {
+ qd_server_config_free(&li->config);
+ if (li->http) qd_http_listener_free(li->http);
+ free_qd_listener_t(li);
+ }
}
-qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
+qd_connector_t *qd_server_connector(qd_server_t *server)
{
- qd_server_t *qd_server = qd->server;
qd_connector_t *ct = new_qd_connector_t();
-
- if (!ct)
+ if (!ct) return 0;
+ sys_atomic_init(&ct->ref_count, 1);
+ ct->server = server;
+ ct->lock = sys_mutex();
+ ct->timer = qd_timer(ct->server->qd, try_open_cb, ct);
+ if (!ct->lock || !ct->timer) {
+ qd_connector_decref(ct);
return 0;
+ }
+ return ct;
+}
- ct->server = qd_server;
+
+bool qd_connector_connect(qd_connector_t *ct)
+{
+ sys_mutex_lock(ct->lock);
ct->state = CXTR_STATE_CONNECTING;
- ct->config = config;
- ct->context = context;
ct->ctx = 0;
- ct->timer = qd_timer(qd, cxtr_try_open, (void*) ct);
ct->delay = 0;
-
+ /* Referenced by timer */
+ sys_atomic_inc(&ct->ref_count);
qd_timer_schedule(ct->timer, ct->delay);
- return ct;
+ sys_mutex_unlock(ct->lock);
+ return true;
}
-void qd_server_connector_free(qd_connector_t* ct)
+void qd_connector_decref(qd_connector_t* ct)
{
- // Don't free the proton connector. This will be done by the connector
- // processing/cleanup.
-
- if (!ct)
- return;
-
- if (ct->ctx) {
- qdpn_connector_close(ct->ctx->pn_cxtr);
- ct->ctx->connector = 0;
+ if (ct && sys_atomic_dec(&ct->ref_count) == 1) {
+ sys_mutex_lock(ct->lock);
+ if (ct->ctx) {
+ ct->ctx->connector = 0;
+ }
+ sys_mutex_unlock(ct->lock);
+ qd_server_config_free(&ct->config);
+ qd_timer_free(ct->timer);
+ free_qd_connector_t(ct);
}
-
- qd_timer_free(ct->timer);
- free_qd_connector_t(ct);
}
-void qd_server_timer_pending_LH(qd_timer_t *timer)
-{
- DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
- qdpn_driver_wakeup(timer->server->driver);
-}
-
-
-void qd_server_timer_cancel_LH(qd_timer_t *timer)
-{
- DEQ_REMOVE(timer->server->pending_timers, timer);
+void qd_server_timeout(qd_server_t *server, qd_duration_t duration) {
+ pn_proactor_set_timeout(server->proactor, duration);
}
qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
const char* qd_connection_name(const qd_connection_t *c) {
- return qdpn_connector_name(c->pn_cxtr);
-}
-
-const char* qd_connection_hostip(const qd_connection_t *c) {
- return qdpn_connector_hostip(c->pn_cxtr);
+ if (c->connector) {
+ return c->connector->config.host_port;
+ } else {
+ return c->rhost_port;
+ }
}
qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
@@ -1760,9 +1232,13 @@ qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
}
const qd_server_config_t *qd_connector_config(const qd_connector_t *c) {
- return c->config;
+ return &c->config;
}
-qd_http_listener_t *qd_listener_http(qd_listener_t *l) {
- return l->http;
+qd_http_listener_t *qd_listener_http(qd_listener_t *li) {
+ return li->http;
+}
+
+const char* qd_connection_hostip(const qd_connection_t *c) {
+ return c->rhost;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 9581196..a6543fa 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -19,34 +19,39 @@
* under the License.
*/
+#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/enum.h>
#include <qpid/dispatch/server.h>
+#include <qpid/dispatch/threading.h>
#include "alloc.h"
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/log.h>
-#include <qpid/dispatch/driver.h>
#include <proton/engine.h>
#include <proton/event.h>
+#include <proton/ssl.h>
#include "dispatch_private.h"
#include "timer_private.h"
#include "http.h"
-void qd_server_timer_pending_LH(qd_timer_t *timer);
-void qd_server_timer_cancel_LH(qd_timer_t *timer);
+#include <netdb.h> /* For NI_MAXHOST/NI_MAXSERV */
-struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
+qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
+void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
const char* qd_connection_name(const qd_connection_t *c);
-const char* qd_connection_hostip(const qd_connection_t *c);
qd_connector_t* qd_connection_connector(const qd_connection_t *c);
+const char* qd_connection_hostip(const qd_connection_t *c);
const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
qd_http_listener_t *qd_listener_http(qd_listener_t *l);
-#define CONTEXT_NO_OWNER -1
-#define CONTEXT_UNSPECIFIED_OWNER -2
+qd_listener_t *qd_server_listener(qd_server_t *server);
+qd_connector_t *qd_server_connector(qd_server_t *server);
+void qd_connector_decref(qd_connector_t* ct);
+void qd_listener_decref(qd_listener_t* ct);
+void qd_server_config_free(qd_server_config_t *cf);
typedef enum {
CXTR_STATE_CONNECTING = 0,
@@ -55,8 +60,6 @@ typedef enum {
} cxtr_state_t;
-
-
typedef struct qd_deferred_call_t {
DEQ_LINKS(struct qd_deferred_call_t);
qd_deferred_t call;
@@ -73,19 +76,63 @@ typedef struct qd_pn_free_link_session_t {
DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+/**
+ * Listener objects represent the desire to accept incoming transport connections.
+ */
+struct qd_listener_t {
+ /* May be referenced by connection_manager and pn_listener_t */
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ qd_server_config_t config;
+ pn_listener_t *pn_listener;
+ qd_http_listener_t *http;
+ DEQ_LINKS(qd_listener_t);
+ bool exit_on_error;
+};
+
+DEQ_DECLARE(qd_listener_t, qd_listener_list_t);
+
+
+/**
+ * Connector objects represent the desire to create and maintain an outgoing transport connection.
+ */
+struct qd_connector_t {
+ /* May be referenced by connection_manager, timer and pn_connection_t */
+ sys_atomic_t ref_count;
+ qd_server_t *server;
+ qd_server_config_t config;
+ qd_timer_t *timer;
+ long delay;
+
+ /* Connector state and ctx can be modified in proactor or management threads. */
+ sys_mutex_t *lock;
+ cxtr_state_t state;
+ qd_connection_t *ctx;
+ DEQ_LINKS(qd_connector_t);
+};
+
+DEQ_DECLARE(qd_connector_t, qd_connector_list_t);
+
+
/**
* Connection objects wrap Proton connection objects.
*/
struct qd_connection_t {
DEQ_LINKS(qd_connection_t);
+ char *name;
qd_server_t *server;
bool opened; // An open callback was invoked for this connection
bool closed;
- int owner_thread;
int enqueued;
- qdpn_connector_t *pn_cxtr;
pn_connection_t *pn_conn;
- pn_collector_t *collector;
pn_ssl_t *ssl;
qd_listener_t *listener;
qd_connector_t *connector;
@@ -102,10 +149,11 @@ struct qd_connection_t {
void *open_container;
qd_deferred_call_list_t deferred_calls;
sys_mutex_t *deferred_call_lock;
- bool event_stall;
bool policy_counted;
char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
qd_pn_free_link_session_list_t free_link_session_list;
+ char rhost[NI_MAXHOST]; /* Remote host numeric IP for incoming connections */
+ char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */
};
DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer.c
----------------------------------------------------------------------
diff --git a/src/timer.c b/src/timer.c
index b2121ae..988c086 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -25,11 +25,15 @@
#include "alloc.h"
#include <assert.h>
#include <stdio.h>
+#include <time.h>
-static sys_mutex_t *lock;
-static qd_timer_list_t idle_timers;
-static qd_timer_list_t scheduled_timers;
-static qd_timestamp_t time_base;
+static sys_mutex_t *lock = NULL;
+static qd_timer_list_t idle_timers = {0};
+static qd_timer_list_t scheduled_timers = {0};
+/* Timers have relative delta_time measured from the previous timer.
+ * The delta_time of the first timer on the queue is measured from timer_base.
+ */
+static qd_timestamp_t time_base = 0;
ALLOC_DECLARE(qd_timer_t);
ALLOC_DEFINE(qd_timer_t);
@@ -41,30 +45,35 @@ sys_mutex_t* qd_timer_lock() { return lock; }
// Private static functions
//=========================================================================
-static void qd_timer_cancel_LH(qd_timer_t *timer)
+static void timer_cancel_LH(qd_timer_t *timer)
{
- switch (timer->state) {
- case TIMER_FREE:
- assert(0);
- break;
-
- case TIMER_IDLE:
- break;
-
- case TIMER_SCHEDULED:
+ if (timer->scheduled) {
if (timer->next)
timer->next->delta_time += timer->delta_time;
DEQ_REMOVE(scheduled_timers, timer);
DEQ_INSERT_TAIL(idle_timers, timer);
- break;
-
- case TIMER_PENDING:
- qd_server_timer_cancel_LH(timer);
- DEQ_INSERT_TAIL(idle_timers, timer);
- break;
+ timer->scheduled = false;
}
+}
- timer->state = TIMER_IDLE;
+/* Adjust timer's time_base and delays for the current time. */
+static void timer_adjust_now_LH()
+{
+ qd_timestamp_t now = qd_timer_now();
+ if (time_base != 0 && now > time_base) {
+ qd_duration_t delta = now - time_base;
+ /* Adjust timer delays by removing duration delta, starting from timer. */
+ for (qd_timer_t *timer = DEQ_HEAD(scheduled_timers); delta > 0 && timer; timer = DEQ_NEXT(timer)) {
+ if (timer->delta_time >= delta) {
+ timer->delta_time -= delta;
+ delta = 0;
+ } else {
+ delta -= timer->delta_time;
+ timer->delta_time = 0; /* Ready to fire */
+ }
+ }
+ }
+ time_base = now;
}
@@ -72,6 +81,7 @@ static void qd_timer_cancel_LH(qd_timer_t *timer)
// Public Functions from timer.h
//=========================================================================
+
qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
{
qd_timer_t *timer = new_qd_timer_t();
@@ -84,8 +94,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
timer->handler = cb;
timer->context = context;
timer->delta_time = 0;
- timer->state = TIMER_IDLE;
-
+ timer->scheduled = false;
sys_mutex_lock(lock);
DEQ_INSERT_TAIL(idle_timers, timer);
sys_mutex_unlock(lock);
@@ -98,73 +107,55 @@ void qd_timer_free(qd_timer_t *timer)
{
if (!timer) return;
sys_mutex_lock(lock);
- qd_timer_cancel_LH(timer);
+ timer_cancel_LH(timer);
DEQ_REMOVE(idle_timers, timer);
sys_mutex_unlock(lock);
-
- timer->state = TIMER_FREE;
free_qd_timer_t(timer);
}
-void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration)
-{
- qd_timer_t *ptr;
- qd_timer_t *last;
- qd_timestamp_t total_time;
+qd_timestamp_t qd_timer_now() {
+ struct timespec tv;
+ clock_gettime(CLOCK_REALTIME, &tv);
+ return ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000;
+}
+
+void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration)
+{
sys_mutex_lock(lock);
- qd_timer_cancel_LH(timer); // Timer is now on the idle list
- assert(timer->state == TIMER_IDLE);
+ timer_cancel_LH(timer); // Timer is now on the idle list
DEQ_REMOVE(idle_timers, timer);
//
- // Handle the special case of a zero-time scheduling. In this case,
- // the timer doesn't go on the scheduled list. It goes straight to the
- // pending list in the server.
- //
- if (duration == 0) {
- timer->state = TIMER_PENDING;
- qd_server_timer_pending_LH(timer);
- sys_mutex_unlock(lock);
- return;
- }
-
- //
// Find the insert point in the schedule.
//
- total_time = 0;
- ptr = DEQ_HEAD(scheduled_timers);
- assert(!ptr || ptr->prev == 0);
- while (ptr) {
- total_time += ptr->delta_time;
- if (total_time > duration)
- break;
+ timer_adjust_now_LH(); /* Adjust the timers for current time */
+
+ /* Invariant: time_before == total time up to but not including ptr */
+ qd_timer_t *ptr = DEQ_HEAD(scheduled_timers);
+ qd_duration_t time_before = 0;
+ while (ptr && time_before + ptr->delta_time < duration) {
+ time_before += ptr->delta_time;
ptr = ptr->next;
}
-
- //
- // Insert the timer into the schedule and adjust the delta time
- // of the following timer if present.
- //
- if (total_time <= duration) {
- assert(ptr == 0);
- timer->delta_time = duration - total_time;
+ /* ptr is the first timer to exceed duration or NULL if we ran out */
+ if (!ptr) {
+ timer->delta_time = duration - time_before;
DEQ_INSERT_TAIL(scheduled_timers, timer);
} else {
- total_time -= ptr->delta_time;
- timer->delta_time = duration - total_time;
- assert(ptr->delta_time > timer->delta_time);
+ timer->delta_time = duration - time_before;
ptr->delta_time -= timer->delta_time;
- last = ptr->prev;
- if (last)
- DEQ_INSERT_AFTER(scheduled_timers, timer, last);
+ ptr = ptr->prev;
+ if (ptr)
+ DEQ_INSERT_AFTER(scheduled_timers, timer, ptr);
else
DEQ_INSERT_HEAD(scheduled_timers, timer);
}
+ timer->scheduled = true;
- timer->state = TIMER_SCHEDULED;
-
+ qd_timer_t *first = DEQ_HEAD(scheduled_timers);
+ qd_server_timeout(first->server, first->delta_time);
sys_mutex_unlock(lock);
}
@@ -172,7 +163,7 @@ void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration)
void qd_timer_cancel(qd_timer_t *timer)
{
sys_mutex_lock(lock);
- qd_timer_cancel_LH(timer);
+ timer_cancel_LH(timer);
sys_mutex_unlock(lock);
}
@@ -181,6 +172,7 @@ void qd_timer_cancel(qd_timer_t *timer)
// Private Functions from timer_private.h
//=========================================================================
+
void qd_timer_initialize(sys_mutex_t *server_lock)
{
lock = server_lock;
@@ -196,47 +188,22 @@ void qd_timer_finalize(void)
}
-qd_timestamp_t qd_timer_next_duration_LH(void)
+/* Execute all timers that are ready and set up next timeout. */
+void qd_timer_visit()
{
+ sys_mutex_lock(lock);
+ timer_adjust_now_LH();
qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
- if (timer)
- return timer->delta_time;
- return -1;
-}
-
-
-void qd_timer_visit_LH(qd_timestamp_t current_time)
-{
- qd_timestamp_t delta;
- qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
-
- if (time_base == 0) {
- time_base = current_time;
- return;
- }
-
- delta = current_time - time_base;
- time_base = current_time;
-
- while (timer) {
- assert(delta >= 0);
- if (timer->delta_time > delta) {
- timer->delta_time -= delta;
- break;
- } else {
- DEQ_REMOVE_HEAD(scheduled_timers);
- delta -= timer->delta_time;
- timer->state = TIMER_PENDING;
- qd_server_timer_pending_LH(timer);
-
- }
+ while (timer && timer->delta_time == 0) {
+ timer_cancel_LH(timer); /* Removes timer from scheduled_timers */
+ sys_mutex_unlock(lock);
+ timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule */
+ sys_mutex_lock(lock);
timer = DEQ_HEAD(scheduled_timers);
}
-}
-
-
-void qd_timer_idle_LH(qd_timer_t *timer)
-{
- timer->state = TIMER_IDLE;
- DEQ_INSERT_TAIL(idle_timers, timer);
+ qd_timer_t *first = DEQ_HEAD(scheduled_timers);
+ if (first) {
+ qd_server_timeout(first->server, first->delta_time);
+ }
+ sys_mutex_unlock(lock);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer_private.h
----------------------------------------------------------------------
diff --git a/src/timer_private.h b/src/timer_private.h
index 4acd988..537eb4b 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -23,30 +23,20 @@
#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/threading.h>
-typedef enum {
- TIMER_FREE,
- TIMER_IDLE,
- TIMER_SCHEDULED,
- TIMER_PENDING
-} qd_timer_state_t;
-
-
struct qd_timer_t {
DEQ_LINKS(qd_timer_t);
qd_server_t *server;
qd_timer_cb_t handler;
void *context;
qd_timestamp_t delta_time;
- qd_timer_state_t state;
+ bool scheduled; /* true means on scheduled list, false on idle list */
};
DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
void qd_timer_initialize(sys_mutex_t *server_lock);
void qd_timer_finalize(void);
-qd_timestamp_t qd_timer_next_duration_LH(void);
-void qd_timer_visit_LH(qd_timestamp_t current_time);
-void qd_timer_idle_LH(qd_timer_t *timer);
+void qd_timer_visit();
/// For tests only
sys_mutex_t* qd_timer_lock();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 314ad50..bc62232 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -29,9 +29,9 @@ set(unit_test_SOURCES
compose_test.c
policy_test.c
run_unit_tests.c
- timer_test.c
tool_test.c
failoverlist_test.c
+ timer_test.c
)
if (USE_MEMORY_POOL)
list(APPEND unit_test_SOURCES alloc_test.c)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index e481f13..c8c4ef3 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -24,7 +24,7 @@
#include <stdio.h>
int tool_tests(void);
-int timer_tests(void);
+int timer_tests(qd_dispatch_t*);
int alloc_tests(void);
int compose_tests(void);
int policy_tests(void);
@@ -52,7 +52,7 @@ int main(int argc, char** argv)
printf("Config failed: %s\n", qd_error_message());
return 1;
}
- result += timer_tests();
+ result += timer_tests(qd);
result += tool_tests();
result += compose_tests();
#if USE_MEMORY_POOL
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_management.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_management.py b/tests/system_tests_management.py
index b9535a8..c26741f 100644
--- a/tests/system_tests_management.py
+++ b/tests/system_tests_management.py
@@ -169,7 +169,7 @@ class ManagementTest(system_test.TestCase):
attributes = {'name':'foo', 'port':str(port), 'role':'normal', 'saslMechanisms': 'ANONYMOUS', 'authenticatePeer': False}
entity = self.assert_create_ok(LISTENER, 'foo', attributes)
self.assertEqual(entity['name'], 'foo')
- self.assertEqual(entity['host'], '127.0.0.1')
+ self.assertEqual(entity['host'], '')
# Connect via the new listener
node3 = self.cleanup(Node.connect(Url(port=port)))
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_policy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index 2ce3778..120137d 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -275,66 +275,29 @@ class SenderReceiverLimits(TestCase):
def test_verify_n_receivers(self):
n = 4
addr = self.address()
-
- # connection should be ok
- denied = False
- try:
- br1 = BlockingConnection(addr)
- except ConnectionException:
- denied = True
-
- self.assertFalse(denied) # assert if connections that should open did not open
+ br1 = BlockingConnection(addr)
# n receivers OK
- try:
- r1 = br1.create_receiver(address="****YES_1of4***")
- r2 = br1.create_receiver(address="****YES_20f4****")
- r3 = br1.create_receiver(address="****YES_3of4****")
- r4 = br1.create_receiver(address="****YES_4of4****")
- except Exception:
- denied = True
-
- self.assertFalse(denied) # n receivers should have worked
+ br1.create_receiver(address="****YES_1of4***")
+ br1.create_receiver(address="****YES_20f4****")
+ br1.create_receiver(address="****YES_3of4****")
+ br1.create_receiver(address="****YES_4of4****")
# receiver n+1 should be denied
- try:
- r5 = br1.create_receiver("****NO****")
- except Exception:
- denied = True
-
- self.assertTrue(denied) # receiver n+1 should have failed
+ self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
br1.close()
def test_verify_n_senders(self):
n = 2
addr = self.address()
-
- # connection should be ok
- denied = False
- try:
- bs1 = BlockingConnection(addr)
- except ConnectionException:
- denied = True
-
- self.assertFalse(denied) # assert if connections that should open did not open
+ bs1 = BlockingConnection(addr)
# n senders OK
- try:
- s1 = bs1.create_sender(address="****YES_1of2****")
- s2 = bs1.create_sender(address="****YES_2of2****")
- except Exception:
- denied = True
-
- self.assertFalse(denied) # n senders should have worked
-
- # receiver n+1 should be denied
- try:
- s3 = bs1.create_sender("****NO****")
- except Exception:
- denied = True
-
- self.assertTrue(denied) # sender n+1 should have failed
+ bs1.create_sender(address="****YES_1of2****")
+ bs1.create_sender(address="****YES_2of2****")
+ # sender n+1 should be denied
+ self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
bs1.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org