You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/04/27 17:38:28 UTC

[03/10] qpid-dispatch git commit: DISPATCH-390: Convert dispatch to use pn_proactor_t

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 848cfcb..15a2aef 100644
--- a/src/server.c
+++ b/src/server.c
@@ -24,6 +24,12 @@
 #include <qpid/dispatch/amqp.h>
 #include <qpid/dispatch/server.h>
 #include <qpid/dispatch/failoverlist.h>
+
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+
 #include "qpid/dispatch/python_embedded.h"
 #include "entity.h"
 #include "entity_cache.h"
@@ -34,94 +40,33 @@
 #include "alloc.h"
 #include "config.h"
 #include <stdio.h>
-#include <time.h>
 #include <string.h>
 #include <errno.h>
 #include <inttypes.h>
 
-typedef struct qd_thread_t {
-    qd_server_t  *qd_server;
-    int           thread_id;
-    volatile int  running;
-    volatile int  canceled;
-    int           using_thread;
-    sys_thread_t *thread;
-} qd_thread_t;
-
-
-typedef struct qd_work_item_t {
-    DEQ_LINKS(struct qd_work_item_t);
-    qdpn_connector_t *cxtr;
-} qd_work_item_t;
-
-DEQ_DECLARE(qd_work_item_t, qd_work_list_t);
-
-
 struct qd_server_t {
     qd_dispatch_t            *qd;
-    int                       thread_count;
+    const int                thread_count; /* Immutable */
     const char               *container_name;
     const char               *sasl_config_path;
     const char               *sasl_config_name;
-    qdpn_driver_t            *driver;
+    pn_proactor_t            *proactor;
+    qd_container_t           *container;
     qd_log_source_t          *log_source;
-    qd_conn_handler_cb_t      conn_handler;
-    qd_pn_event_handler_cb_t  pn_event_handler;
-    qd_pn_event_complete_cb_t pn_event_complete_handler;
     void                     *start_context;
-    void                     *conn_handler_context;
     sys_cond_t               *cond;
     sys_mutex_t              *lock;
-    qd_thread_t             **threads;
-    qd_work_list_t            work_queue;
-    qd_timer_list_t           pending_timers;
-    bool                      a_thread_is_waiting;
     int                       pause_requests;
     int                       threads_paused;
     int                       pause_next_sequence;
     int                       pause_now_serving;
-    qd_signal_handler_cb_t    signal_handler;
-    bool                      signal_handler_running;
-    void                     *signal_context;
-    int                       pending_signal;
-    qd_timer_t               *heartbeat_timer;
     uint64_t                 next_connection_id;
     void                     *py_displayname_obj;
     qd_http_server_t         *http;
 };
 
-/**
- * Listener objects represent the desire to accept incoming transport connections.
- */
-struct qd_listener_t {
-    qd_server_t              *server;
-    const qd_server_config_t *config;
-    void                     *context;
-    qdpn_listener_t          *pn_listener;
-    qd_http_listener_t       *http;
-};
-
-
-/**
- * Connector objects represent the desire to create and maintain an outgoing transport connection.
- */
-struct qd_connector_t {
-    qd_server_t              *server;
-    cxtr_state_t              state;
-    const qd_server_config_t *config;
-    void                     *context;
-    qd_connection_t          *ctx;
-    qd_timer_t               *timer;
-    long                      delay;
-};
-
-
-
-static __thread qd_server_t *thread_server = 0;
-
 #define HEARTBEAT_INTERVAL 1000
 
-ALLOC_DEFINE(qd_work_item_t);
 ALLOC_DEFINE(qd_listener_t);
 ALLOC_DEFINE(qd_connector_t);
 ALLOC_DEFINE(qd_deferred_call_t);
@@ -142,49 +87,32 @@ const char CERT_FINGERPRINT_SHA256 = '2';
 const char CERT_FINGERPRINT_SHA512 = '5';
 char *COMPONENT_SEPARATOR = ";";
 
-static void setup_ssl_sasl_and_open(qd_connection_t *ctx);
+static const int BACKLOG = 50;  /* Listening backlog */
 
-static qd_thread_t *thread(qd_server_t *qd_server, int id)
-{
-    qd_thread_t *thread = NEW(qd_thread_t);
-    if (!thread)
-        return 0;
-
-    thread->qd_server    = qd_server;
-    thread->thread_id    = id;
-    thread->running      = 0;
-    thread->canceled     = 0;
-    thread->using_thread = 0;
-
-    return thread;
-}
+static void setup_ssl_sasl_and_open(qd_connection_t *ctx);
 
-static void free_qd_connection(qd_connection_t *ctx)
-{
-    if (ctx->policy_settings) {
-        if (ctx->policy_settings->sources)
-            free(ctx->policy_settings->sources);
-        if (ctx->policy_settings->targets)
-            free(ctx->policy_settings->targets);
-        free (ctx->policy_settings);
-        ctx->policy_settings = 0;
-    }
-    if (ctx->pn_conn) {
-        pn_connection_set_context(ctx->pn_conn, 0);
-        pn_decref(ctx->pn_conn);
-        ctx->pn_conn = NULL;
-    }
-    if (ctx->collector) {
-        pn_collector_free(ctx->collector);
-        ctx->collector = NULL;
+/* Construct a new qd_connectoin. */
+static qd_connection_t *qd_connection(qd_server_t *server, const char *role) {
+    qd_connection_t *ctx = new_qd_connection_t();
+    if (!ctx) return NULL;
+    ZERO(ctx);
+    ctx->pn_conn       = pn_connection();
+    ctx->deferred_call_lock = sys_mutex();
+    ctx->role = strdup(role);
+    if (!ctx->pn_conn || !ctx->deferred_call_lock || !role) {
+        if (ctx->pn_conn) pn_connection_free(ctx->pn_conn);
+        if (ctx->deferred_call_lock) sys_mutex_free(ctx->deferred_call_lock);
+        free(ctx->role);
+        return NULL;
     }
-
-    if (ctx->free_user_id)
-        free((char*)ctx->user_id);
-
-    free(ctx->role);
-
-    free_qd_connection_t(ctx);
+    ctx->server        = server;
+    DEQ_ITEM_INIT(ctx);
+    DEQ_INIT(ctx->deferred_calls);
+    DEQ_INIT(ctx->free_link_session_list);
+    sys_mutex_lock(server->lock);
+    ctx->connection_id = server->next_connection_id++;
+    sys_mutex_unlock(server->lock);
+    return ctx;
 }
 
 /**
@@ -222,7 +150,7 @@ qd_error_t qd_register_display_name_service(qd_dispatch_t *qd, void *displayname
 static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tport)
 {
     const qd_server_config_t *config =
-            conn->connector ? conn->connector->config : conn->listener->config;
+            conn->connector ? &conn->connector->config : &conn->listener->config;
 
     if (config->ssl_uid_format) {
         // The ssl_uid_format length cannot be greater that 7
@@ -418,22 +346,6 @@ static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tpo
 }
 
 
-/**
- * Allocate a new qd_connection
- *  with DEQ items initialized, call lock allocated, and all other fields cleared.
- */
-static qd_connection_t *connection_allocate()
-{
-    qd_connection_t *ctx = new_qd_connection_t();
-    ZERO(ctx);
-    DEQ_ITEM_INIT(ctx);
-    DEQ_INIT(ctx->deferred_calls);
-    ctx->deferred_call_lock = sys_mutex();
-    DEQ_INIT(ctx->free_link_session_list);
-    return ctx;
-}
-
-
 void qd_connection_set_user(qd_connection_t *conn)
 {
     pn_transport_t *tport = pn_connection_transport(conn->pn_conn);
@@ -507,20 +419,6 @@ static qd_error_t listener_setup_ssl(qd_connection_t *ctx, const qd_server_confi
     return QD_ERROR_NONE;
 }
 
-// Format the identity of an incoming connection to buf for logging
-static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
-{
-    qd_listener_t *qd_listener = qdpn_listener_context(qdpn_connector_listener(cxtr));
-    assert(qd_listener);
-    const char *cname = qdpn_connector_name(cxtr);
-    const char *host = qd_listener->config->host;
-    const char *port = qd_listener->config->port;
-    const char *protocol = qd_listener->config->http ? "HTTP" : "AMQP";
-    snprintf(buf, size, "incoming %s connection from %s to %s:%s",
-             protocol, cname, host, port);
-    return buf;
-}
-
 
 static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
 {
@@ -602,637 +500,383 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c
 }
 
 
-static void thread_process_listeners_LH(qd_server_t *qd_server)
+static void on_accept(pn_event_t *e)
 {
-    qdpn_driver_t    *driver = qd_server->driver;
-    qdpn_listener_t  *listener;
-    qdpn_connector_t *cxtr;
-    qd_connection_t  *ctx;
-
-    for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
-        qd_listener_t *li = qdpn_listener_context(listener);
-        bool policy_counted = false;
-        cxtr = qdpn_listener_accept(listener, qd_server->qd->policy, &qd_policy_socket_accept, &policy_counted);
-        if (!cxtr)
-            continue;
-
-        char logbuf[qd_log_max_len()];
-
-        ctx = connection_allocate();
-        ctx->server        = qd_server;
-        ctx->owner_thread  = CONTEXT_UNSPECIFIED_OWNER;
-        ctx->pn_cxtr       = cxtr;
-        ctx->listener      = qdpn_listener_context(listener);
-        ctx->context       = ctx->listener->context;
-        ctx->connection_id = qd_server->next_connection_id++; // Increment the connection id so the next connection can use it
-        ctx->policy_counted = policy_counted;
-
-        // Copy the role from the listener config
-        int role_length    = strlen(ctx->listener->config->role) + 1;
-        ctx->role          = (char*) malloc(role_length);
-        strcpy(ctx->role, ctx->listener->config->role);
-
-        pn_connection_t *conn = pn_connection();
-        ctx->collector = pn_collector();
-        pn_connection_collect(conn, ctx->collector);
-        decorate_connection(qd_server, conn, ctx->listener->config);
-        qdpn_connector_set_connection(cxtr, conn);
-        pn_connection_set_context(conn, ctx);
-        ctx->pn_conn = conn;
-        ctx->owner_thread = CONTEXT_NO_OWNER;
-        qdpn_connector_set_context(cxtr, ctx);
-
-        qd_log(qd_server->log_source, QD_LOG_INFO, "Accepting %s with connection id [%"PRIu64"]",
-           log_incoming(logbuf, sizeof(logbuf), cxtr), ctx->connection_id);
+    assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
+    pn_listener_t *pn_listener = pn_event_listener(e);
+    qd_listener_t *listener = pn_listener_get_context(pn_listener);
+    qd_connection_t *ctx = qd_connection(listener->server, listener->config.role);
+    if (!ctx) {
+        qd_log(listener->server->log_source, QD_LOG_CRITICAL,
+               "Allocation failure during accept to %s", listener->config.host_port);
+        return;
+    }
+    pn_connection_set_context(ctx->pn_conn, ctx);
+    ctx->listener = listener;
+    decorate_connection(listener->server, ctx->pn_conn, &ctx->listener->config);
+    qd_log(listener->server->log_source, QD_LOG_TRACE,
+           "[%"PRIu64"] Accepting incoming connection from %s to %s",
+           ctx->connection_id, qd_connection_name(ctx), ctx->listener->config.host_port);
+    /* Asynchronous accept, configure the transport on PN_CONNECTION_BOUND */
+    pn_listener_accept(pn_listener, ctx->pn_conn);
+ }
+
+
+/* Log the description, set the transport condition (name, description) close the transport tail. */
+void connect_fail(qd_connection_t *ctx, const char *name, const char *description, ...)
+{
+    va_list ap;
+    va_start(ap, description);
+    qd_verror(QD_ERROR_RUNTIME, description, ap);
+    va_end(ap);
+    if (ctx->pn_conn) {
+        pn_transport_t *t = pn_connection_transport(ctx->pn_conn);
+        /* Normally this is closing the transport but if not bound close the connection. */
+        pn_condition_t *cond = t ? pn_transport_condition(t) : pn_connection_condition(ctx->pn_conn);
+        if (cond && !pn_condition_is_set(cond)) {
+            va_start(ap, description);
+            pn_condition_vformat(cond, name, description, ap);
+            va_end(ap);
+        }
+        if (t) {
+            pn_transport_close_tail(t);
+        } else {
+            pn_connection_close(ctx->pn_conn);
+        }
+    }
+}
 
-        //
-        // Get a pointer to the transport so we can insert security components into it
-        //
-        pn_transport_t           *tport  = qdpn_connector_transport(cxtr);
-        const qd_server_config_t *config = ctx->listener->config;
 
-        //
-        // Configure the transport.
-        //
+/* Get the host IP address for the remote end */
+static int set_remote_host_port(qd_connection_t *ctx) {
+    pn_transport_t *tport  = pn_connection_transport(ctx->pn_conn);
+    const struct sockaddr_storage* addr = pn_proactor_addr_sockaddr(pn_proactor_addr_remote(tport));
+    int err = 0;
+    if (!addr) {
+        err = -1;
+        qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
+    } else {
+        char rport[NI_MAXSERV] = "";
+        int err = getnameinfo((struct sockaddr*)addr, sizeof(*addr),
+                              ctx->rhost, sizeof(ctx->rhost), rport, sizeof(rport),
+                              NI_NUMERICHOST | NI_NUMERICSERV);
+        if (!err) {
+            snprintf(ctx->rhost_port, sizeof(ctx->rhost_port), "%s:%s", ctx->rhost, rport);
+        } else {
+            qd_log(ctx->server->log_source, QD_LOG_ERROR, "No remote address for connection to %s");
+        }
+    }
+    return err;
+}
+
+
+/* Configure the transport once it is bound to the connection */
+static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
+    pn_connection_t *pn_conn = pn_event_connection(e);
+    qd_connection_t *ctx = pn_connection_get_context(pn_conn);
+    pn_transport_t *tport  = pn_connection_transport(pn_conn);
+    pn_transport_set_context(tport, ctx); /* for transport_tracer */
+
+    //
+    // Proton pushes out its trace to transport_tracer() which in turn writes a trace
+    // message to the qdrouter log If trace level logging is enabled on the router set
+    // PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
+    //
+    if (qd_log_enabled(ctx->server->log_source, QD_LOG_TRACE)) {
+        pn_transport_trace(tport, PN_TRACE_FRM);
+        pn_transport_set_tracer(tport, transport_tracer);
+    }
+
+    const qd_server_config_t *config = NULL;
+    if (ctx->listener) {        /* Accepting an incoming connection */
+        config = &ctx->listener->config;
+        const char *name = config->host_port;
         pn_transport_set_server(tport);
-        pn_transport_set_max_frame(tport, config->max_frame_size);
-        pn_transport_set_channel_max(tport, config->max_sessions - 1);
-        pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
 
-        //
-        // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log
-        // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
-        //
-        pn_transport_set_context(tport, ctx);
-        if (qd_log_enabled(qd_server->log_source, QD_LOG_TRACE)) {
-            pn_transport_trace(tport, PN_TRACE_FRM);
-            pn_transport_set_tracer(tport, transport_tracer);
+        if (set_remote_host_port(ctx) == 0 &&
+            qd_policy_socket_accept(server->qd->policy, ctx->rhost))
+        {
+            ctx->policy_counted = true;
+        } else {
+            pn_transport_close_tail(tport);
+            pn_transport_close_head(tport);
+            return;
         }
 
-        if (li->http) {
-            // Set up HTTP if configured, HTTP provides its own SSL.
-            qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring HTTP%s on %s",
-                   config->ssl_profile ? "S" : "",
-                   log_incoming(logbuf, sizeof(logbuf), cxtr));
-            qd_http_listener_accept(li->http, cxtr);
-        } else if (config->ssl_profile)  {
-            // Set up SSL if configured and HTTP is not providing SSL.
-            qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s",
-                   log_incoming(logbuf, sizeof(logbuf), cxtr));
+        // Set up SSL
+        if (config->ssl_profile)  {
+            qd_log(ctx->server->log_source, QD_LOG_TRACE, "Configuring SSL on %s", name);
             if (listener_setup_ssl(ctx, config, tport) != QD_ERROR_NONE) {
-                qd_log(qd_server->log_source, QD_LOG_ERROR, "%s on %s",
-                       qd_error_message(), log_incoming(logbuf, sizeof(logbuf), cxtr));
-                qdpn_connector_close(cxtr);
-                continue;
+                connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "%s on %s", qd_error_message(), name);
+                return;
             }
         }
-
         //
         // Set up SASL
         //
+        sys_mutex_lock(ctx->server->lock);
         pn_sasl_t *sasl = pn_sasl(tport);
-        if (qd_server->sasl_config_path)
-            pn_sasl_config_path(sasl, qd_server->sasl_config_path);
-        pn_sasl_config_name(sasl, qd_server->sasl_config_name);
+        if (ctx->server->sasl_config_path)
+            pn_sasl_config_path(sasl, ctx->server->sasl_config_path);
+        pn_sasl_config_name(sasl, ctx->server->sasl_config_name);
         if (config->sasl_mechanisms)
             pn_sasl_allowed_mechs(sasl, config->sasl_mechanisms);
         pn_transport_require_auth(tport, config->requireAuthentication);
         pn_transport_require_encryption(tport, config->requireEncryption);
         pn_sasl_set_allow_insecure_mechs(sasl, config->allowInsecureAuthentication);
+        sys_mutex_unlock(ctx->server->lock);
+
+        qd_log(ctx->server->log_source, QD_LOG_INFO, "Accepted connection to %s from %s",
+               name, ctx->rhost_port);
+    } else if (ctx->connector) { /* Establishing an outgoing connection */
+        config = &ctx->connector->config;
+        setup_ssl_sasl_and_open(ctx);
+    } else {                    /* No connector and no listener */
+        connect_fail(ctx, QD_AMQP_COND_INTERNAL_ERROR, "unknown Connection");
+        return;
     }
-}
-
-
-static void handle_signals_LH(qd_server_t *qd_server)
-{
-    int signum = qd_server->pending_signal;
-
-    if (signum && !qd_server->signal_handler_running) {
-        qd_server->signal_handler_running = true;
-        qd_server->pending_signal = 0;
-        if (qd_server->signal_handler) {
-            sys_mutex_unlock(qd_server->lock);
-            qd_server->signal_handler(qd_server->signal_context, signum);
-            sys_mutex_lock(qd_server->lock);
-        }
-        qd_server->signal_handler_running = false;
-    }
-}
-
 
-static void block_if_paused_LH(qd_server_t *qd_server)
-{
-    if (qd_server->pause_requests > 0) {
-        qd_server->threads_paused++;
-        sys_cond_signal_all(qd_server->cond);
-        while (qd_server->pause_requests > 0)
-            sys_cond_wait(qd_server->cond, qd_server->lock);
-        qd_server->threads_paused--;
-    }
+    //
+    // Common transport configuration.
+    //
+    pn_transport_set_max_frame(tport, config->max_frame_size);
+    pn_transport_set_channel_max(tport, config->max_sessions - 1);
+    pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
 }
 
 
 static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
 {
-    qd_deferred_call_list_t  calls;
-    qd_deferred_call_t      *dc;
-
-    //
-    // Copy the deferred calls out of the connection under lock.
+    // Lock access to deferred_calls, other threads may concurrently add to it.  Invoke
+    // the calls outside of the critical section.
     //
-    DEQ_INIT(calls);
     sys_mutex_lock(conn->deferred_call_lock);
-    dc = DEQ_HEAD(conn->deferred_calls);
-    while (dc) {
+    qd_deferred_call_t *dc;
+    while ((dc = DEQ_HEAD(conn->deferred_calls))) {
         DEQ_REMOVE_HEAD(conn->deferred_calls);
-        DEQ_INSERT_TAIL(calls, dc);
-        dc = DEQ_HEAD(conn->deferred_calls);
-    }
-    sys_mutex_unlock(conn->deferred_call_lock);
-
-    //
-    // Invoke the calls outside of the critical section.
-    //
-    dc = DEQ_HEAD(calls);
-    while (dc) {
-        DEQ_REMOVE_HEAD(calls);
+        sys_mutex_unlock(conn->deferred_call_lock);
         dc->call(dc->context, discard);
         free_qd_deferred_call_t(dc);
-        dc = DEQ_HEAD(calls);
+        sys_mutex_lock(conn->deferred_call_lock);
     }
+    sys_mutex_unlock(conn->deferred_call_lock);
 }
 
-
-static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
-{
-    qd_connection_t *ctx = qdpn_connector_context(cxtr);
-    int events = 0;
-    int passes = 0;
-
-    if (ctx->closed)
-        return 0;
-
-    do {
-        passes++;
-
-        //
-        // If this connection is outbound and is just now opening, do the initial SSL/SASL setup
-        //
-        if (!ctx->opened && !!ctx->connector && !qdpn_connector_closed(cxtr))
-            setup_ssl_sasl_and_open(ctx);
-
-        //
-        // Step the engine for pre-handler processing
-        //
-        qdpn_connector_process(cxtr);
-
-        //
-        // If the connector has closed, notify the client via callback.
-        //
-        if (qdpn_connector_closed(cxtr)) {
-            if (ctx->opened)
-                qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
-                                        QD_CONN_EVENT_CLOSE,
-                                        (qd_connection_t*) qdpn_connector_context(cxtr));
-            ctx->closed = true;
-            events = 0;
-            break;
-        }
-
-        invoke_deferred_calls(ctx, false);
-
-        qd_connection_t *qd_conn   = (qd_connection_t*) qdpn_connector_context(cxtr);
-        pn_collector_t  *collector = qd_connection_collector(qd_conn);
-        pn_event_t      *event;
-
-        events = 0;
-        if (!ctx->event_stall) {
-            event = pn_collector_peek(collector);
-            while (event) {
-                //
-                // If we are transitioning to the open state, notify the client via callback.
-                //
-                if (!ctx->opened && pn_event_type(event) == PN_CONNECTION_REMOTE_OPEN) {
-                    ctx->opened = true;
-                    if (ctx->connector) {
-                        ctx->connector->delay = 2000;  // Delay on re-connect in case there is a recurring error
-                    } else
-                        assert(ctx->listener);
-                    events = 1;
-                } else if (pn_event_type(event) == PN_TRANSPORT_ERROR) {
-                    if (ctx->connector) {
-                        const qd_server_config_t *config = ctx->connector->config;
-                        qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
-                    }
-                }
-
-                events += qd_server->pn_event_handler(qd_server->conn_handler_context, ctx->context, event, qd_conn);
-                pn_collector_pop(collector);
-
-                event = ctx->event_stall ? 0 : pn_collector_peek(collector);
+void qd_container_handle_event(qd_container_t *container, pn_event_t *event);
+
+static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
+    qd_log_source_t *log = qd_server->log_source;
+
+    /* FIXME aconway 2017-02-20: HTTP support */
+    qd_listener_t *li = (qd_listener_t*) pn_listener_get_context(pn_event_listener(e));
+    const char *host_port = li->config.host_port;
+    switch (pn_event_type(e)) {
+
+    case PN_LISTENER_OPEN:
+        qd_log(log, QD_LOG_NOTICE, "Listening on %s", host_port);
+        break;
+
+    case PN_LISTENER_ACCEPT:
+        qd_log(log, QD_LOG_INFO, "Accepting connection on %s", host_port);
+        on_accept(e);
+        break;
+
+    case PN_LISTENER_CLOSE: {
+        pn_condition_t *cond = pn_listener_condition(li->pn_listener);
+        if (pn_condition_is_set(cond)) {
+            qd_log(log, QD_LOG_ERROR, "Listener error on %s: %s (%s)", host_port,
+                   pn_condition_get_description(cond),
+                   pn_condition_get_name(cond));
+            if (li->exit_on_error) {
+                qd_log(log, QD_LOG_CRITICAL, "Shutting down, required listener failed %s",
+                       host_port);
+                exit(1);
             }
-
-            //
-            // Free up any links and sessions that need to be freed since all the events have been popped from the collector.
-            //
-            qd_server->pn_event_complete_handler(qd_server->conn_handler_context, qd_conn);
-            events += qd_server->conn_handler(qd_server->conn_handler_context, ctx->context, QD_CONN_EVENT_WRITABLE, qd_conn);
+        } else {
+            qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port);
         }
-    } while (events > 0);
-
-    return passes > 1;
+        qd_listener_decref(li);
+        break;
+    }
+    default:
+        break;
+    }
 }
 
 
-//
-// TEMPORARY FUNCTION PROTOTYPES
-//
-void qdpn_driver_wait_1(qdpn_driver_t *d);
-int  qdpn_driver_wait_2(qdpn_driver_t *d, int timeout);
-void qdpn_driver_wait_3(qdpn_driver_t *d);
-//
-// END TEMPORARY
-//
-
-static void *thread_run(void *arg)
+static void qd_connection_free(qd_connection_t *ctx)
 {
-    qd_thread_t      *thread    = (qd_thread_t*) arg;
-    qd_work_item_t   *work;
-    qdpn_connector_t *cxtr;
-    qd_connection_t  *ctx;
-    int               error;
-    int               poll_result;
-
-    if (!thread)
-        return 0;
+    qd_server_t *qd_server = ctx->server;
 
-    qd_server_t      *qd_server = thread->qd_server;
-    thread_server   = qd_server;
-    thread->running = 1;
+    qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx); /* Removed management entity */
 
-    if (thread->canceled)
-        return 0;
-
-    //
-    // Main Loop
-    //
-    while (thread->running) {
-        sys_mutex_lock(qd_server->lock);
-
-        //
-        // Check for pending signals to process
-        //
-        handle_signals_LH(qd_server);
-        if (!thread->running) {
-            sys_mutex_unlock(qd_server->lock);
-            break;
-        }
-
-        //
-        // Check to see if the server is pausing.  If so, block here.
-        //
-        block_if_paused_LH(qd_server);
-        if (!thread->running) {
-            sys_mutex_unlock(qd_server->lock);
-            break;
-        }
-
-        //
-        // Service pending timers.
-        //
-        qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers);
-        if (timer) {
-            DEQ_REMOVE_HEAD(qd_server->pending_timers);
-
-            //
-            // Mark the timer as idle in case it reschedules itself.
-            //
-            qd_timer_idle_LH(timer);
+    // If this is a dispatch connector, schedule the re-connect timer
+    if (ctx->connector) {
+        sys_mutex_lock(ctx->connector->lock);
+        ctx->connector->ctx = 0;
+        ctx->connector->state = CXTR_STATE_CONNECTING;
+        sys_mutex_unlock(ctx->connector->lock);
+        qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+    }
 
-            //
-            // Release the lock and invoke the connection handler.
-            //
-            sys_mutex_unlock(qd_server->lock);
-            timer->handler(timer->context);
-            qdpn_driver_wakeup(qd_server->driver);
-            continue;
-        }
+    // If counted for policy enforcement, notify it has closed
+    sys_mutex_lock(qd_server->lock);
+    if (ctx->policy_counted) {
+        qd_policy_socket_close(qd_server->qd->policy, ctx);
+    }
+    sys_mutex_unlock(qd_server->lock);
 
-        //
-        // Check the work queue for connectors scheduled for processing.
-        //
-        work = DEQ_HEAD(qd_server->work_queue);
-        if (!work) {
-            //
-            // There is no pending work to do
-            //
-            if (qd_server->a_thread_is_waiting) {
-                //
-                // Another thread is waiting on the proton driver, this thread must
-                // wait on the condition variable until signaled.
-                //
-                sys_cond_wait(qd_server->cond, qd_server->lock);
-            } else {
-                //
-                // This thread elects itself to wait on the proton driver.  Set the
-                // thread-is-waiting flag so other idle threads will not interfere.
-                //
-                qd_server->a_thread_is_waiting = true;
-
-                //
-                // Ask the timer module when its next timer is scheduled to fire.  We'll
-                // use this value in driver_wait as the timeout.  If there are no scheduled
-                // timers, the returned value will be -1.
-                //
-                qd_timestamp_t duration = qd_timer_next_duration_LH();
-
-                //
-                // Invoke the proton driver's wait sequence.  This is a bit of a hack for now
-                // and will be improved in the future.  The wait process is divided into three parts,
-                // the first and third of which need to be non-reentrant, and the second of which
-                // must be reentrant (and blocks).
-                //
-                qdpn_driver_wait_1(qd_server->driver);
-                sys_mutex_unlock(qd_server->lock);
-
-                do {
-                    error = 0;
-                    poll_result = qdpn_driver_wait_2(qd_server->driver, duration);
-                    if (poll_result == -1)
-                        error = errno;
-                } while (error == EINTR);
-                if (error) {
-                    exit(-1);
-                }
+    invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
+    if (ctx->deferred_call_lock)
+        sys_mutex_free(ctx->deferred_call_lock);
 
-                sys_mutex_lock(qd_server->lock);
-                qdpn_driver_wait_3(qd_server->driver);
+    if (ctx->policy_settings) {
+        if (ctx->policy_settings->sources)
+            free(ctx->policy_settings->sources);
+        if (ctx->policy_settings->targets)
+            free(ctx->policy_settings->targets);
+        free (ctx->policy_settings);
+        ctx->policy_settings = 0;
+    }
 
-                if (!thread->running) {
-                    sys_mutex_unlock(qd_server->lock);
-                    break;
-                }
+    if (ctx->free_user_id) free((char*)ctx->user_id);
+    free(ctx->role);
+    free_qd_connection_t(ctx);
 
-                //
-                // Visit the timer module.
-                //
-                struct timespec tv;
-                clock_gettime(CLOCK_REALTIME, &tv);
-                qd_timestamp_t milliseconds = ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000;
-                qd_timer_visit_LH(milliseconds);
-
-                //
-                // Process listeners (incoming connections).
-                //
-                thread_process_listeners_LH(qd_server);
-
-                //
-                // Traverse the list of connectors-needing-service from the proton driver.
-                // If the connector is not already in the work queue and it is not currently
-                // being processed by another thread, put it in the work queue and signal the
-                // condition variable.
-                //
-                cxtr = qdpn_driver_connector(qd_server->driver);
-                while (cxtr) {
-                    ctx = qdpn_connector_context(cxtr);
-                    if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
-                        ctx->enqueued = 1;
-                        qd_work_item_t *workitem = new_qd_work_item_t();
-                        DEQ_ITEM_INIT(workitem);
-                        workitem->cxtr = cxtr;
-                        DEQ_INSERT_TAIL(qd_server->work_queue, workitem);
-                        sys_cond_signal(qd_server->cond);
-                    }
-                    cxtr = qdpn_driver_connector(qd_server->driver);
-                }
+    /* Note: pn_conn is freed by the proactor */
+}
 
-                //
-                // Release our exclusive claim on qdpn_driver_wait.
-                //
-                qd_server->a_thread_is_waiting = false;
-            }
-        }
 
-        //
-        // If we were given a connector to work on from the work queue, mark it as
-        // owned by this thread and as no longer enqueued.
-        //
-        cxtr = 0;
-        if (work) {
-            DEQ_REMOVE_HEAD(qd_server->work_queue);
-            ctx = qdpn_connector_context(work->cxtr);
-            if (ctx->owner_thread == CONTEXT_NO_OWNER) {
-                ctx->owner_thread = thread->thread_id;
-                ctx->enqueued = 0;
-                cxtr = work->cxtr;
-                free_qd_work_item_t(work);
-            } else {
-                //
-                // This connector is being processed by another thread, re-queue it.
-                //
-                DEQ_INSERT_TAIL(qd_server->work_queue, work);
+/* Events involving a connection or listener are serialized by the proactor so
+ * only one event per connection / listener will be processed at a time.
+ */
+static bool handle(pn_event_t *e, qd_server_t *qd_server) {
+    pn_connection_t *pn_conn = pn_event_connection(e);
+    qd_connection_t *ctx  = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL;
+
+    switch (pn_event_type(e)) {
+
+    case PN_PROACTOR_INTERRUPT:
+        /* Stop the current thread */
+        return false;
+
+    case PN_PROACTOR_TIMEOUT:
+        qd_timer_visit();
+        break;
+
+    case PN_LISTENER_OPEN:
+    case PN_LISTENER_ACCEPT:
+    case PN_LISTENER_CLOSE:
+        handle_listener(e, qd_server);
+        break;
+
+    case PN_CONNECTION_BOUND:
+        on_connection_bound(qd_server, e);
+        break;
+
+    case PN_CONNECTION_REMOTE_OPEN:
+        // If we are transitioning to the open state, notify the client via callback.
+        if (!ctx->opened) {
+            ctx->opened = true;
+            if (ctx->connector) {
+                ctx->connector->delay = 2000;  // Delay re-connect in case there is a recurring error
             }
         }
-        sys_mutex_unlock(qd_server->lock);
-
-        //
-        // Process the connector that we now have exclusive access to.
-        //
-        if (cxtr) {
-            int work_done = 1;
-
-            if (qdpn_connector_failed(cxtr))
-                qdpn_connector_close(cxtr);
-
-            //
-            // Even if the connector has failed there are still events that 
-            // must be processed so that associated links will be cleaned up.
-            //
-            work_done = process_connector(qd_server, cxtr);
+        break;
 
-            //
-            // Check to see if the connector was closed during processing
-            //
-            if (qdpn_connector_closed(cxtr)) {
-                qd_entity_cache_remove(QD_CONNECTION_TYPE, ctx);
-                //
-                // Connector is closed.  Free the context and the connector.
-                // If this is a dispatch connector, schedule the re-connect timer
-                //
-                if (ctx->connector) {
-                    ctx->connector->ctx = 0;
-                    ctx->connector->state = CXTR_STATE_CONNECTING;
-                    qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
-                }
-
-                sys_mutex_lock(qd_server->lock);
-
-                if (ctx->policy_counted) {
-                    qd_policy_socket_close(qd_server->qd->policy, ctx);
-                }
-
-                invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
-                sys_mutex_free(ctx->deferred_call_lock);
-                qdpn_connector_free(cxtr);
-                free_qd_connection(ctx);
-                sys_mutex_unlock(qd_server->lock);
-            } else {
-                //
-                // The connector lives on.  Mark it as no longer owned by this thread.
-                //
-                sys_mutex_lock(qd_server->lock);
-                ctx->owner_thread = CONTEXT_NO_OWNER;
-                sys_mutex_unlock(qd_server->lock);
-            }
+    case PN_CONNECTION_WAKE:
+        invoke_deferred_calls(ctx, false);
+        break;
 
-            //
-            // Wake up the proton driver to force it to reconsider its set of FDs
-            // in light of the processing that just occurred.
-            //
-            if (work_done)
-                qdpn_driver_wakeup(qd_server->driver);
+    case PN_TRANSPORT_ERROR:
+        if (ctx && ctx->connector) { /* Outgoing connection */
+            const qd_server_config_t *config = &ctx->connector->config;
+            qd_log(qd_server->log_source, QD_LOG_TRACE, "Connection to %s failed", config->host_port);
         }
-    }
-
-    return 0;
-}
-
-
-static void thread_start(qd_thread_t *thread)
-{
-    if (!thread)
-        return;
-
-    thread->using_thread = 1;
-    thread->thread = sys_thread(thread_run, (void*) thread);
-}
-
-
-static void thread_cancel(qd_thread_t *thread)
-{
-    if (!thread)
-        return;
-
-    thread->running  = 0;
-    thread->canceled = 1;
-}
+        break;
 
+    default:
+        break;
+    } // Switch event type
 
-static void thread_join(qd_thread_t *thread)
-{
-    if (!thread)
-        return;
+    /* TODO aconway 2017-04-18: fold the container handler into the server */
+    qd_container_handle_event(qd_server->container, e);
 
-    if (thread->using_thread) {
-        sys_thread_join(thread->thread);
-        sys_thread_free(thread->thread);
+    /* Free the connection after all other processing */
+    if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
+        pn_connection_set_context(pn_conn, NULL);
+        qd_connection_free(ctx);
     }
+    return true;
 }
 
-
-static void thread_free(qd_thread_t *thread)
+static void *thread_run(void *arg)
 {
-    if (!thread)
-        return;
-
-    free(thread);
+    qd_server_t      *qd_server = (qd_server_t*)arg;
+    bool running = true;
+    while (running) {
+        pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
+        pn_event_t * e;
+        while (running && (e = pn_event_batch_next(events))) {
+            running = handle(e, qd_server);
+        }
+        pn_proactor_done(qd_server->proactor, events);
+    }
+    return NULL;
 }
 
 
-static void cxtr_try_open(void *context)
+/* Timer callback to try/retry connection open */
+static void try_open_lh(qd_connector_t *ct)
 {
-    qd_connector_t *ct = (qd_connector_t*) context;
-    if (ct->state != CXTR_STATE_CONNECTING)
+    if (ct->state != CXTR_STATE_CONNECTING) {
+        /* No longer referenced by pn_connection or timer */
+        qd_connector_decref(ct);
         return;
+    }
 
-    qd_connection_t *ctx = connection_allocate();
-    ctx->server       = ct->server;
-    ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
-    ctx->pn_conn      = pn_connection();
-    ctx->collector    = pn_collector();
-    ctx->connector    = ct;
-    ctx->context      = ct->context;
-
-    // Copy the role from the connector config
-    int role_length    = strlen(ctx->connector->config->role) + 1;
-    ctx->role          = (char*) malloc(role_length);
-    strcpy(ctx->role, ctx->connector->config->role);
-
-    qd_log(ct->server->log_source, QD_LOG_INFO, "Connecting to %s:%s", ct->config->host, ct->config->port);
-
-    pn_connection_collect(ctx->pn_conn, ctx->collector);
-    decorate_connection(ctx->server, ctx->pn_conn, ct->config);
-
-    //
-    // qdpn_connector is not thread safe
-    //
-    sys_mutex_lock(ct->server->lock);
-    // Increment the connection id so the next connection can use it
-    ctx->connection_id = ct->server->next_connection_id++;
-    ctx->pn_cxtr = qdpn_connector(ct->server->driver, ct->config->host, ct->config->port, ct->config->protocol_family, (void*) ctx);
-    sys_mutex_unlock(ct->server->lock);
-
-    const qd_server_config_t *config = ct->config;
-
-    if (ctx->pn_cxtr == 0) {
-        sys_mutex_free(ctx->deferred_call_lock);
-        free_qd_connection(ctx);
+    qd_connection_t *ctx = qd_connection(ct->server, ct->config.role);
+    if (!ctx) {                 /* Try again later */
+        qd_log(ct->server->log_source, QD_LOG_CRITICAL, "Allocation failure connecting to %s",
+               ct->config.host_port);
         ct->delay = 10000;
         qd_timer_schedule(ct->timer, ct->delay);
         return;
     }
+    ctx->connector    = ct;
+    decorate_connection(ctx->server, ctx->pn_conn, &ct->config);
+    const qd_server_config_t *config = &ct->config;
 
     //
-    // Set the hostname on the pn_connection. This hostname will be used by proton as the hostname in the open frame.
+    // Set the hostname on the pn_connection. This hostname will be used by proton as the
+    // hostname in the open frame.
     //
     pn_connection_set_hostname(ctx->pn_conn, config->host);
 
-    // Set the sasl user name and password on the proton connection object. This has to be done before the call to qdpn_connector_transport() which
-    // binds the transport to the connection
+    // Set the sasl user name and password on the proton connection object. This has to be
+    // done before pn_proactor_connect which will bind a transport to the connection
     if(config->sasl_username)
         pn_connection_set_user(ctx->pn_conn, config->sasl_username);
     if (config->sasl_password)
         pn_connection_set_password(ctx->pn_conn, config->sasl_password);
 
-    qdpn_connector_set_connection(ctx->pn_cxtr, ctx->pn_conn);
     pn_connection_set_context(ctx->pn_conn, ctx);
 
     ctx->connector->state = CXTR_STATE_OPEN;
-
     ct->ctx   = ctx;
     ct->delay = 5000;
 
-    //
-    // Set up the transport, SASL, and SSL for the connection.
-    //
-    pn_transport_t *tport  = qdpn_connector_transport(ctx->pn_cxtr);
-
-    //
-    // Configure the transport
-    //
-    pn_transport_set_max_frame(tport, config->max_frame_size);
-    pn_transport_set_channel_max(tport, config->max_sessions - 1);
-    pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000);
-
-    //
-    // Proton pushes out its trace to transport_tracer() which in turn writes a trace message to the qdrouter log
-    //
-    // If trace level logging is enabled on the router set PN_TRACE_DRV | PN_TRACE_FRM | PN_TRACE_RAW on the proton transport
-    pn_transport_set_context(tport, ctx);
-    if (qd_log_enabled(ct->server->log_source, QD_LOG_TRACE)) {
-        pn_transport_trace(tport, PN_TRACE_FRM);
-        pn_transport_set_tracer(tport, transport_tracer);
-    }
-
-    ctx->owner_thread = CONTEXT_NO_OWNER;
+    qd_log(ct->server->log_source, QD_LOG_TRACE,
+           "[%"PRIu64"] Connecting to %s", ctx->connection_id, config->host_port);
+    /* Note: the transport is configured in the PN_CONNECTION_BOUND event */
+    pn_proactor_connect(ct->server->proactor, ctx->pn_conn, config->host_port);
 }
 
-
 static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
 {
     qd_connector_t *ct = ctx->connector;
-    const qd_server_config_t *config = ct->config;
-    pn_transport_t *tport  = qdpn_connector_transport(ctx->pn_cxtr);
+    const qd_server_config_t *config = &ct->config;
+    pn_transport_t *tport  = pn_connection_transport(ctx->pn_conn);
 
     //
     // Set up SSL if appropriate
@@ -1242,19 +886,16 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
 
         if (!domain) {
             qd_error(QD_ERROR_RUNTIME, "SSL domain failed for connection to %s:%s",
-                     ct->config->host, ct->config->port);
-            /* TODO aconway 2014-07-15: Close the connection, clean up. */
+                     ct->config.host, ct->config.port);
             return;
         }
 
-        /* TODO aconway 2014-07-15: error handling on all SSL calls. */
-
         // set our trusted database for checking the peer's cert:
         if (config->ssl_trusted_certificate_db) {
             if (pn_ssl_domain_set_trusted_ca_db(domain, config->ssl_trusted_certificate_db)) {
                 qd_log(ct->server->log_source, QD_LOG_ERROR,
                        "SSL CA configuration failed for %s:%s",
-                       ct->config->host, ct->config->port);
+                       ct->config.host, ct->config.port);
             }
         }
         // should we force the peer to provide a cert?
@@ -1267,7 +908,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
                                                       trusted)) {
                 qd_log(ct->server->log_source, QD_LOG_ERROR,
                        "SSL peer auth configuration failed for %s:%s",
-                       ct->config->host, ct->config->port);
+                       config->host, config->port);
             }
         }
 
@@ -1279,7 +920,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
                                               config->ssl_password)) {
                 qd_log(ct->server->log_source, QD_LOG_ERROR,
                        "SSL local configuration failed for %s:%s",
-                       ct->config->host, ct->config->port);
+                       config->host, config->port);
             }
         }
 
@@ -1288,7 +929,7 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
             if (pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, NULL)) {
                     qd_log(ct->server->log_source, QD_LOG_ERROR,
                            "SSL peer host name verification failed for %s:%s",
-                           ct->config->host, ct->config->port);
+                           config->host, config->port);
             }
         }
 
@@ -1310,58 +951,46 @@ static void setup_ssl_sasl_and_open(qd_connection_t *ctx)
     pn_connection_open(ctx->pn_conn);
 }
 
-
-static void heartbeat_cb(void *context)
-{
-    qd_server_t *qd_server = (qd_server_t*) context;
-    qdpn_activate_all(qd_server->driver);
-    qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
+static void try_open_cb(void *context) {
+    qd_connector_t *ct = (qd_connector_t*) context;
+    sys_mutex_lock(ct->lock);
+    try_open_lh(ct);
+    sys_mutex_unlock(ct->lock);
 }
 
 
 qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *container_name,
                        const char *sasl_config_path, const char *sasl_config_name)
 {
-    int i;
-
+    /* Initialize const members, 0 initialize all others. */
+    qd_server_t tmp = { .thread_count = thread_count };
     qd_server_t *qd_server = NEW(qd_server_t);
     if (qd_server == 0)
         return 0;
+    memcpy(qd_server, &tmp, sizeof(tmp));
 
     qd_server->qd               = qd;
     qd_server->log_source       = qd_log_source("SERVER");
-    qd_server->thread_count     = thread_count;
     qd_server->container_name   = container_name;
     qd_server->sasl_config_path = sasl_config_path;
     qd_server->sasl_config_name = sasl_config_name;
-    qd_server->driver           = qdpn_driver(qd_server->log_source);
-    qd_server->conn_handler     = 0;
-    qd_server->pn_event_handler = 0;
-    qd_server->signal_handler   = 0;
+    qd_server->proactor         = pn_proactor();
+    qd_server->container        = 0;
     qd_server->start_context    = 0;
-    qd_server->signal_context   = 0;
     qd_server->lock             = sys_mutex();
     qd_server->cond             = sys_cond();
 
     qd_timer_initialize(qd_server->lock);
 
-    qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count);
-    for (i = 0; i < thread_count; i++)
-        qd_server->threads[i] = thread(qd_server, i);
-
-    DEQ_INIT(qd_server->work_queue);
-    DEQ_INIT(qd_server->pending_timers);
-    qd_server->a_thread_is_waiting    = false;
     qd_server->pause_requests         = 0;
     qd_server->threads_paused         = 0;
     qd_server->pause_next_sequence    = 0;
     qd_server->pause_now_serving      = 0;
-    qd_server->pending_signal         = 0;
-    qd_server->signal_handler_running = false;
-    qd_server->heartbeat_timer        = 0;
     qd_server->next_connection_id     = 1;
     qd_server->py_displayname_obj     = 0;
-    qd_server->http                   = qd_http_server(qd, qd_server->log_source);
+
+    /* FIXME aconway 2017-01-20: restore HTTP support */
+
     qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
 
     return qd_server;
@@ -1371,191 +1000,64 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
 void qd_server_free(qd_server_t *qd_server)
 {
     if (!qd_server) return;
-    for (int i = 0; i < qd_server->thread_count; i++)
-        thread_free(qd_server->threads[i]);
     qd_http_server_free(qd_server->http);
     qd_timer_finalize();
-    qdpn_driver_free(qd_server->driver);
+    pn_proactor_free(qd_server->proactor);
     sys_mutex_free(qd_server->lock);
     sys_cond_free(qd_server->cond);
-    free(qd_server->threads);
     Py_XDECREF((PyObject *)qd_server->py_displayname_obj);
     free(qd_server);
 }
 
-
-void qd_server_set_conn_handler(qd_dispatch_t            *qd,
-                                qd_conn_handler_cb_t      handler,
-                                qd_pn_event_handler_cb_t  pn_event_handler,
-                                qd_pn_event_complete_cb_t pn_event_complete_handler,
-                                void                     *handler_context)
-{
-    qd->server->conn_handler              = handler;
-    qd->server->pn_event_handler          = pn_event_handler;
-    qd->server->pn_event_complete_handler = pn_event_complete_handler;
-    qd->server->conn_handler_context      = handler_context;
-}
-
-
-void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context)
+void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container)
 {
-    qd->server->signal_handler = handler;
-    qd->server->signal_context = context;
-}
-
-
-static void qd_server_announce(qd_server_t* qd_server)
-{
-    qd_log(qd_server->log_source, QD_LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count);
-#ifndef NDEBUG
-    qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
-#endif
+    qd->server->container              = container;
 }
 
 
 void qd_server_run(qd_dispatch_t *qd)
 {
     qd_server_t *qd_server = qd->server;
-
-    int i;
-    if (!qd_server)
-        return;
-
-    assert(qd_server->conn_handler); // Server can't run without a connection handler.
-
-    for (i = 1; i < qd_server->thread_count; i++)
-        thread_start(qd_server->threads[i]);
-
-    qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server);
-    qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
-
-    qd_server_announce(qd_server);
-
-    thread_run((void*) qd_server->threads[0]);
-
-    for (i = 1; i < qd_server->thread_count; i++)
-        thread_join(qd_server->threads[i]);
-
-    for (i = 0; i < qd_server->thread_count; i++)
-        qd_server->threads[i]->canceled = 0;
-
-    qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
-}
-
-
-void qd_server_start(qd_dispatch_t *qd)
-{
-    qd_server_t *qd_server = qd->server;
     int i;
+    assert(qd_server);
+    assert(qd_server->container); // Server can't run without a container
+    qd_log(qd_server->log_source, QD_LOG_NOTICE, "Operational, %d Threads Running",
+           qd_server->thread_count);
+#ifndef NDEBUG
+    qd_log(qd_server->log_source, QD_LOG_INFO, "Running in DEBUG Mode");
+#endif
+    int n = qd_server->thread_count - 1; /* Start count-1 threads + use current thread */
+    sys_thread_t **threads = (sys_thread_t **)calloc(n, sizeof(sys_thread_t*));
+    for (i = 0; i < n; i++) {
+        threads[i] = sys_thread(thread_run, qd_server);
+    }
+    thread_run(qd_server);      /* Use the current thread */
+    for (i = 0; i < n; i++) {
+        sys_thread_join(threads[i]);
+        sys_thread_free(threads[i]);
+    }
+    free(threads);
 
-    if (!qd_server)
-        return;
-
-    assert(qd_server->conn_handler); // Server can't run without a connection handler.
-
-    for (i = 0; i < qd_server->thread_count; i++)
-        thread_start(qd_server->threads[i]);
-
-    qd_server->heartbeat_timer = qd_timer(qd, heartbeat_cb, qd_server);
-    qd_timer_schedule(qd_server->heartbeat_timer, HEARTBEAT_INTERVAL);
-
-    qd_server_announce(qd_server);
+    qd_log(qd_server->log_source, QD_LOG_NOTICE, "Shut Down");
 }
 
 
 void qd_server_stop(qd_dispatch_t *qd)
 {
-    qd_server_t *qd_server = qd->server;
-    int idx;
-
-    sys_mutex_lock(qd_server->lock);
-    for (idx = 0; idx < qd_server->thread_count; idx++)
-        thread_cancel(qd_server->threads[idx]);
-    sys_cond_signal_all(qd_server->cond);
-    qdpn_driver_wakeup(qd_server->driver);
-    sys_mutex_unlock(qd_server->lock);
-
-    if (thread_server != qd_server) {
-        for (idx = 0; idx < qd_server->thread_count; idx++)
-            thread_join(qd_server->threads[idx]);
-        qd_log(qd_server->log_source, QD_LOG_INFO, "Shut Down");
+    /* Disconnect everything, interrupt threads */
+    pn_proactor_disconnect(qd->server->proactor, NULL);
+    for (int i = 0; i < qd->server->thread_count; i++) {
+        pn_proactor_interrupt(qd->server->proactor);
     }
 }
 
-
-void qd_server_signal(qd_dispatch_t *qd, int signum)
+void qd_server_activate(qd_connection_t *ctx)
 {
-    if (!qd)
+    if (!ctx || !ctx->pn_conn)
         return;
-
-    qd_server_t *qd_server = qd->server;
-
-    qd_server->pending_signal = signum;
-    sys_cond_signal_all(qd_server->cond);
-    qdpn_driver_wakeup(qd_server->driver);
-}
-
-
-void qd_server_pause(qd_dispatch_t *qd)
-{
-    qd_server_t *qd_server = qd->server;
-
-    sys_mutex_lock(qd_server->lock);
-
-    //
-    // Bump the request count to stop all the threads.
-    //
-    qd_server->pause_requests++;
-    int my_sequence = qd_server->pause_next_sequence++;
-
-    //
-    // Awaken all threads that are currently blocking.
-    //
-    sys_cond_signal_all(qd_server->cond);
-    qdpn_driver_wakeup(qd_server->driver);
-
-    //
-    // Wait for the paused thread count plus the number of threads requesting a pause to equal
-    // the total thread count.  Also, don't exit the blocking loop until now_serving equals our
-    // sequence number.  This ensures that concurrent pausers don't run at the same time.
-    //
-    while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) ||
-           (my_sequence != qd_server->pause_now_serving))
-        sys_cond_wait(qd_server->cond, qd_server->lock);
-
-    sys_mutex_unlock(qd_server->lock);
-}
-
-
-void qd_server_resume(qd_dispatch_t *qd)
-{
-    qd_server_t *qd_server = qd->server;
-
-    sys_mutex_lock(qd_server->lock);
-    qd_server->pause_requests--;
-    qd_server->pause_now_serving++;
-    sys_cond_signal_all(qd_server->cond);
-    sys_mutex_unlock(qd_server->lock);
+    pn_connection_wake(ctx->pn_conn);
 }
 
-
-void qd_server_activate(qd_connection_t *ctx, bool awaken)
-{
-    if (!ctx)
-        return;
-
-    qdpn_connector_t *ctor = ctx->pn_cxtr;
-    if (!ctor)
-        return;
-
-    if (!qdpn_connector_closed(ctor)) {
-        qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
-        if (awaken)
-            qdpn_driver_wakeup(ctx->server->driver);
-    }
-}
-
-
 void qd_connection_set_context(qd_connection_t *conn, void *context)
 {
     conn->user_context = context;
@@ -1598,11 +1100,6 @@ bool qd_connection_inbound(qd_connection_t *conn)
 }
 
 
-pn_collector_t *qd_connection_collector(qd_connection_t *conn)
-{
-    return conn->collector;
-}
-
 uint64_t qd_connection_connection_id(qd_connection_t *conn)
 {
     return conn->connection_id;
@@ -1612,8 +1109,8 @@ uint64_t qd_connection_connection_id(qd_connection_t *conn)
 const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
 {
     if (conn->listener)
-        return conn->listener->config;
-    return conn->connector->config;
+        return &conn->listener->config;
+    return &conn->connector->config;
 }
 
 
@@ -1628,131 +1125,106 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, vo
     DEQ_INSERT_TAIL(conn->deferred_calls, dc);
     sys_mutex_unlock(conn->deferred_call_lock);
 
-    qd_server_activate(conn, true);
+    qd_server_activate(conn);
 }
 
 
-void qd_connection_set_event_stall(qd_connection_t *conn, bool stall)
+qd_listener_t *qd_server_listener(qd_server_t *server)
 {
-    conn->event_stall = stall;
-     if (!stall)
-         qd_server_activate(conn, true);
-}
+    qd_listener_t *li = new_qd_listener_t();
+    if (!li) return 0;
+    ZERO(li);
 
-qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
-{
-    qd_server_t   *qd_server = qd->server;
-    qd_listener_t *li        = new_qd_listener_t();
-
-    if (!li)
-        return 0;
-
-    li->server      = qd_server;
-    li->config      = config;
-    li->context     = context;
+    sys_atomic_init(&li->ref_count, 1);
+    li->server      = server;
     li->http = NULL;
-
-    if (config->http) {
-        li->http = qd_http_listener(qd_server->http, config);
-        if (!li->http) {
-            free_qd_listener_t(li);
-            qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start HTTP listener on %s:%s",
-                   config->host, config->port);
-            return NULL;
-        }
-    }
-
-    li->pn_listener = qdpn_listener(
-        qd_server->driver, config->host, config->port, config->protocol_family, li);
-
-    if (!li->pn_listener) {
-        free_qd_listener_t(li);
-        qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
-               config->host, config->port);
-        return NULL;
-    }
-    qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s%s", config->host, config->port,
-           config->http ? (config->ssl_profile ? "(HTTPS)":"(HTTP)") : "");
-
     return li;
 }
 
 
-void qd_server_listener_free(qd_listener_t* li)
-{
-    if (!li)
-        return;
-    if (li->http) qd_http_listener_free(li->http);
-    qdpn_listener_free(li->pn_listener);
-    free_qd_listener_t(li);
+bool qd_listener_listen(qd_listener_t *li) {
+    if (!li->pn_listener) {      /* Not already listening */
+        li->pn_listener = pn_listener();
+        if (!li->pn_listener) {
+            qd_log(li->server->log_source, QD_LOG_ERROR, "No memory listening on %s",
+                   li->config.host_port);
+            return false;
+        }
+        pn_listener_set_context(li->pn_listener, li);
+        /* Listen is asynchronous, log listening on PN_LISTENER_OPEN */
+        sys_atomic_inc(&li->ref_count);
+        pn_proactor_listen(li->server->proactor, li->pn_listener, li->config.host_port, BACKLOG);
+    }
+    return true;
 }
 
 
-void qd_server_listener_close(qd_listener_t* li)
+void qd_listener_decref(qd_listener_t* li)
 {
-    if (li)
-        qdpn_listener_close(li->pn_listener);
+    if (li && sys_atomic_dec(&li->ref_count) == 1) {
+        qd_server_config_free(&li->config);
+        if (li->http) qd_http_listener_free(li->http);
+        free_qd_listener_t(li);
+    }
 }
 
 
-qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
+qd_connector_t *qd_server_connector(qd_server_t *server)
 {
-    qd_server_t    *qd_server = qd->server;
     qd_connector_t *ct        = new_qd_connector_t();
-
-    if (!ct)
+    if (!ct) return 0;
+    sys_atomic_init(&ct->ref_count, 1);
+    ct->server  = server;
+    ct->lock = sys_mutex();
+    ct->timer = qd_timer(ct->server->qd, try_open_cb, ct);
+    if (!ct->lock || !ct->timer) {
+        qd_connector_decref(ct);
         return 0;
+    }
+    return ct;
+}
 
-    ct->server  = qd_server;
+
+bool qd_connector_connect(qd_connector_t *ct)
+{
+    sys_mutex_lock(ct->lock);
     ct->state   = CXTR_STATE_CONNECTING;
-    ct->config  = config;
-    ct->context = context;
     ct->ctx     = 0;
-    ct->timer   = qd_timer(qd, cxtr_try_open, (void*) ct);
     ct->delay   = 0;
-
+    /* Referenced by timer */
+    sys_atomic_inc(&ct->ref_count);
     qd_timer_schedule(ct->timer, ct->delay);
-    return ct;
+    sys_mutex_unlock(ct->lock);
+    return true;
 }
 
 
-void qd_server_connector_free(qd_connector_t* ct)
+void qd_connector_decref(qd_connector_t* ct)
 {
-    // Don't free the proton connector.  This will be done by the connector
-    // processing/cleanup.
-
-    if (!ct)
-        return;
-
-    if (ct->ctx) {
-        qdpn_connector_close(ct->ctx->pn_cxtr);
-        ct->ctx->connector = 0;
+    if (ct && sys_atomic_dec(&ct->ref_count) == 1) {
+        sys_mutex_lock(ct->lock);
+        if (ct->ctx) {
+            ct->ctx->connector = 0;
+        }
+        sys_mutex_unlock(ct->lock);
+        qd_server_config_free(&ct->config);
+        qd_timer_free(ct->timer);
+        free_qd_connector_t(ct);
     }
-
-    qd_timer_free(ct->timer);
-    free_qd_connector_t(ct);
 }
 
-void qd_server_timer_pending_LH(qd_timer_t *timer)
-{
-    DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
-    qdpn_driver_wakeup(timer->server->driver);
-}
-
-
-void qd_server_timer_cancel_LH(qd_timer_t *timer)
-{
-    DEQ_REMOVE(timer->server->pending_timers, timer);
+void qd_server_timeout(qd_server_t *server, qd_duration_t duration) {
+    pn_proactor_set_timeout(server->proactor, duration);
 }
 
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
 
 const char* qd_connection_name(const qd_connection_t *c) {
-    return qdpn_connector_name(c->pn_cxtr);
-}
-
-const char* qd_connection_hostip(const qd_connection_t *c) {
-    return qdpn_connector_hostip(c->pn_cxtr);
+    if (c->connector) {
+        return c->connector->config.host_port;
+    } else {
+        return c->rhost_port;
+    }
 }
 
 qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
@@ -1760,9 +1232,13 @@ qd_connector_t* qd_connection_connector(const qd_connection_t *c) {
 }
 
 const qd_server_config_t *qd_connector_config(const qd_connector_t *c) {
-    return c->config;
+    return &c->config;
 }
 
-qd_http_listener_t *qd_listener_http(qd_listener_t *l) {
-    return l->http;
+qd_http_listener_t *qd_listener_http(qd_listener_t *li) {
+    return li->http;
+}
+
+const char* qd_connection_hostip(const qd_connection_t *c) {
+    return c->rhost;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 9581196..a6543fa 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -19,34 +19,39 @@
  * under the License.
  */
 
+#include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/enum.h>
 #include <qpid/dispatch/server.h>
+#include <qpid/dispatch/threading.h>
 #include "alloc.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/log.h>
-#include <qpid/dispatch/driver.h>
 #include <proton/engine.h>
 #include <proton/event.h>
+#include <proton/ssl.h>
 
 #include "dispatch_private.h"
 #include "timer_private.h"
 #include "http.h"
 
-void qd_server_timer_pending_LH(qd_timer_t *timer);
-void qd_server_timer_cancel_LH(qd_timer_t *timer);
+#include <netdb.h>              /* For NI_MAXHOST/NI_MAXSERV */
 
-struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
+qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
+void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
 
 const char* qd_connection_name(const qd_connection_t *c);
-const char* qd_connection_hostip(const qd_connection_t *c);
 qd_connector_t* qd_connection_connector(const qd_connection_t *c);
+const char* qd_connection_hostip(const qd_connection_t *c);
 
 const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
 
 qd_http_listener_t *qd_listener_http(qd_listener_t *l);
 
-#define CONTEXT_NO_OWNER -1
-#define CONTEXT_UNSPECIFIED_OWNER -2
+qd_listener_t *qd_server_listener(qd_server_t *server);
+qd_connector_t *qd_server_connector(qd_server_t *server);
+void qd_connector_decref(qd_connector_t* ct);
+void qd_listener_decref(qd_listener_t* ct);
+void qd_server_config_free(qd_server_config_t *cf);
 
 typedef enum {
     CXTR_STATE_CONNECTING = 0,
@@ -55,8 +60,6 @@ typedef enum {
 } cxtr_state_t;
 
 
-
-
 typedef struct qd_deferred_call_t {
     DEQ_LINKS(struct qd_deferred_call_t);
     qd_deferred_t  call;
@@ -73,19 +76,63 @@ typedef struct qd_pn_free_link_session_t {
 
 DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
 
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+/**
+ * Listener objects represent the desire to accept incoming transport connections.
+ */
+struct qd_listener_t {
+    /* May be referenced by connection_manager and pn_listener_t */
+    sys_atomic_t              ref_count;
+    qd_server_t              *server;
+    qd_server_config_t        config;
+    pn_listener_t            *pn_listener;
+    qd_http_listener_t       *http;
+    DEQ_LINKS(qd_listener_t);
+    bool                      exit_on_error;
+};
+
+DEQ_DECLARE(qd_listener_t, qd_listener_list_t);
+
+
+/**
+ * Connector objects represent the desire to create and maintain an outgoing transport connection.
+ */
+struct qd_connector_t {
+    /* May be referenced by connection_manager, timer and pn_connection_t */
+    sys_atomic_t              ref_count;
+    qd_server_t              *server;
+    qd_server_config_t        config;
+    qd_timer_t               *timer;
+    long                      delay;
+
+    /* Connector state and ctx can be modified in proactor or management threads. */
+    sys_mutex_t              *lock;
+    cxtr_state_t              state;
+    qd_connection_t          *ctx;
+    DEQ_LINKS(qd_connector_t);
+};
+
+DEQ_DECLARE(qd_connector_t, qd_connector_list_t);
+
+
 /**
  * Connection objects wrap Proton connection objects.
  */
 struct qd_connection_t {
     DEQ_LINKS(qd_connection_t);
+    char                     *name;
     qd_server_t              *server;
     bool                      opened; // An open callback was invoked for this connection
     bool                      closed;
-    int                       owner_thread;
     int                       enqueued;
-    qdpn_connector_t         *pn_cxtr;
     pn_connection_t          *pn_conn;
-    pn_collector_t           *collector;
     pn_ssl_t                 *ssl;
     qd_listener_t            *listener;
     qd_connector_t           *connector;
@@ -102,10 +149,11 @@ struct qd_connection_t {
     void                     *open_container;
     qd_deferred_call_list_t   deferred_calls;
     sys_mutex_t              *deferred_call_lock;
-    bool                      event_stall;
     bool                      policy_counted;
     char                     *role;  //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc.
     qd_pn_free_link_session_list_t  free_link_session_list;
+    char rhost[NI_MAXHOST];     /* Remote host numeric IP for incoming connections */
+    char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer.c
----------------------------------------------------------------------
diff --git a/src/timer.c b/src/timer.c
index b2121ae..988c086 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -25,11 +25,15 @@
 #include "alloc.h"
 #include <assert.h>
 #include <stdio.h>
+#include <time.h>
 
-static sys_mutex_t     *lock;
-static qd_timer_list_t  idle_timers;
-static qd_timer_list_t  scheduled_timers;
-static qd_timestamp_t   time_base;
+static sys_mutex_t     *lock = NULL;
+static qd_timer_list_t  idle_timers = {0};
+static qd_timer_list_t  scheduled_timers = {0};
+/* Timers have relative delta_time measured from the previous timer.
+ * The delta_time of the first timer on the queue is measured from timer_base.
+ */
+static qd_timestamp_t   time_base = 0;
 
 ALLOC_DECLARE(qd_timer_t);
 ALLOC_DEFINE(qd_timer_t);
@@ -41,30 +45,35 @@ sys_mutex_t* qd_timer_lock() { return lock; }
 // Private static functions
 //=========================================================================
 
-static void qd_timer_cancel_LH(qd_timer_t *timer)
+static void timer_cancel_LH(qd_timer_t *timer)
 {
-    switch (timer->state) {
-    case TIMER_FREE:
-        assert(0);
-        break;
-
-    case TIMER_IDLE:
-        break;
-
-    case TIMER_SCHEDULED:
+    if (timer->scheduled) {
         if (timer->next)
             timer->next->delta_time += timer->delta_time;
         DEQ_REMOVE(scheduled_timers, timer);
         DEQ_INSERT_TAIL(idle_timers, timer);
-        break;
-
-    case TIMER_PENDING:
-        qd_server_timer_cancel_LH(timer);
-        DEQ_INSERT_TAIL(idle_timers, timer);
-        break;
+        timer->scheduled = false;
     }
+}
 
-    timer->state = TIMER_IDLE;
+/* Adjust timer's time_base and delays for the current time. */
+static void timer_adjust_now_LH()
+{
+    qd_timestamp_t now = qd_timer_now();
+    if (time_base != 0 && now > time_base) {
+        qd_duration_t delta = now - time_base;
+        /* Adjust timer delays by removing duration delta, starting from timer. */
+        for (qd_timer_t *timer = DEQ_HEAD(scheduled_timers); delta > 0 && timer; timer = DEQ_NEXT(timer)) {
+            if (timer->delta_time >= delta) {
+                timer->delta_time -= delta;
+                delta = 0;
+            } else {
+                delta -= timer->delta_time;
+                timer->delta_time = 0; /* Ready to fire */
+            }
+        }
+    }
+    time_base = now;
 }
 
 
@@ -72,6 +81,7 @@ static void qd_timer_cancel_LH(qd_timer_t *timer)
 // Public Functions from timer.h
 //=========================================================================
 
+
 qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
 {
     qd_timer_t *timer = new_qd_timer_t();
@@ -84,8 +94,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
     timer->handler    = cb;
     timer->context    = context;
     timer->delta_time = 0;
-    timer->state      = TIMER_IDLE;
-
+    timer->scheduled  = false;
     sys_mutex_lock(lock);
     DEQ_INSERT_TAIL(idle_timers, timer);
     sys_mutex_unlock(lock);
@@ -98,73 +107,55 @@ void qd_timer_free(qd_timer_t *timer)
 {
     if (!timer) return;
     sys_mutex_lock(lock);
-    qd_timer_cancel_LH(timer);
+    timer_cancel_LH(timer);
     DEQ_REMOVE(idle_timers, timer);
     sys_mutex_unlock(lock);
-
-    timer->state = TIMER_FREE;
     free_qd_timer_t(timer);
 }
 
 
-void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration)
-{
-    qd_timer_t     *ptr;
-    qd_timer_t     *last;
-    qd_timestamp_t  total_time;
+qd_timestamp_t qd_timer_now() {
+    struct timespec tv;
+    clock_gettime(CLOCK_REALTIME, &tv);
+    return ((qd_timestamp_t)tv.tv_sec) * 1000 + tv.tv_nsec / 1000000;
+}
+
 
+void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration)
+{
     sys_mutex_lock(lock);
-    qd_timer_cancel_LH(timer);  // Timer is now on the idle list
-    assert(timer->state == TIMER_IDLE);
+    timer_cancel_LH(timer);  // Timer is now on the idle list
     DEQ_REMOVE(idle_timers, timer);
 
     //
-    // Handle the special case of a zero-time scheduling.  In this case,
-    // the timer doesn't go on the scheduled list.  It goes straight to the
-    // pending list in the server.
-    //
-    if (duration == 0) {
-        timer->state = TIMER_PENDING;
-        qd_server_timer_pending_LH(timer);
-        sys_mutex_unlock(lock);
-        return;
-    }
-
-    //
     // Find the insert point in the schedule.
     //
-    total_time = 0;
-    ptr        = DEQ_HEAD(scheduled_timers);
-    assert(!ptr || ptr->prev == 0);
-    while (ptr) {
-        total_time += ptr->delta_time;
-        if (total_time > duration)
-            break;
+    timer_adjust_now_LH();   /* Adjust the timers for current time */
+
+    /* Invariant: time_before == total time up to but not including ptr */
+    qd_timer_t *ptr = DEQ_HEAD(scheduled_timers);
+    qd_duration_t time_before = 0;
+    while (ptr && time_before + ptr->delta_time < duration) {
+        time_before += ptr->delta_time;
         ptr = ptr->next;
     }
-
-    //
-    // Insert the timer into the schedule and adjust the delta time
-    // of the following timer if present.
-    //
-    if (total_time <= duration) {
-        assert(ptr == 0);
-        timer->delta_time = duration - total_time;
+    /* ptr is the first timer to exceed duration or NULL if we ran out */
+    if (!ptr) {
+        timer->delta_time = duration - time_before;
         DEQ_INSERT_TAIL(scheduled_timers, timer);
     } else {
-        total_time -= ptr->delta_time;
-        timer->delta_time = duration - total_time;
-        assert(ptr->delta_time > timer->delta_time);
+        timer->delta_time = duration - time_before;
         ptr->delta_time -= timer->delta_time;
-        last = ptr->prev;
-        if (last)
-            DEQ_INSERT_AFTER(scheduled_timers, timer, last);
+        ptr = ptr->prev;
+        if (ptr)
+            DEQ_INSERT_AFTER(scheduled_timers, timer, ptr);
         else
             DEQ_INSERT_HEAD(scheduled_timers, timer);
     }
+    timer->scheduled = true;
 
-    timer->state = TIMER_SCHEDULED;
-
+    qd_timer_t *first = DEQ_HEAD(scheduled_timers);
+    qd_server_timeout(first->server, first->delta_time);
     sys_mutex_unlock(lock);
 }
 
@@ -172,7 +163,7 @@ void qd_timer_schedule(qd_timer_t *timer, qd_timestamp_t duration)
 void qd_timer_cancel(qd_timer_t *timer)
 {
     sys_mutex_lock(lock);
-    qd_timer_cancel_LH(timer);
+    timer_cancel_LH(timer);
     sys_mutex_unlock(lock);
 }
 
@@ -181,6 +172,7 @@ void qd_timer_cancel(qd_timer_t *timer)
 // Private Functions from timer_private.h
 //=========================================================================
 
+
 void qd_timer_initialize(sys_mutex_t *server_lock)
 {
     lock = server_lock;
@@ -196,47 +188,22 @@ void qd_timer_finalize(void)
 }
 
 
-qd_timestamp_t qd_timer_next_duration_LH(void)
+/* Execute all timers that are ready and set up next timeout. */
+void qd_timer_visit()
 {
+    sys_mutex_lock(lock);
+    timer_adjust_now_LH();
     qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
-    if (timer)
-        return timer->delta_time;
-    return -1;
-}
-
-
-void qd_timer_visit_LH(qd_timestamp_t current_time)
-{
-    qd_timestamp_t  delta;
-    qd_timer_t     *timer = DEQ_HEAD(scheduled_timers);
-
-    if (time_base == 0) {
-        time_base = current_time;
-        return;
-    }
-
-    delta     = current_time - time_base;
-    time_base = current_time;
-
-    while (timer) {
-        assert(delta >= 0);
-        if (timer->delta_time > delta) {
-            timer->delta_time -= delta;
-            break;
-        } else {
-            DEQ_REMOVE_HEAD(scheduled_timers);
-            delta -= timer->delta_time;
-            timer->state = TIMER_PENDING;
-            qd_server_timer_pending_LH(timer);
-
-        }
+    while (timer && timer->delta_time == 0) {
+        timer_cancel_LH(timer); /* Removes timer from scheduled_timers */
+        sys_mutex_unlock(lock);
+        timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule */
+        sys_mutex_lock(lock);
         timer = DEQ_HEAD(scheduled_timers);
     }
-}
-
-
-void qd_timer_idle_LH(qd_timer_t *timer)
-{
-    timer->state = TIMER_IDLE;
-    DEQ_INSERT_TAIL(idle_timers, timer);
+    qd_timer_t *first = DEQ_HEAD(scheduled_timers);
+    if (first) {
+        qd_server_timeout(first->server, first->delta_time);
+    }
+    sys_mutex_unlock(lock);
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/src/timer_private.h
----------------------------------------------------------------------
diff --git a/src/timer_private.h b/src/timer_private.h
index 4acd988..537eb4b 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -23,30 +23,20 @@
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/threading.h>
 
-typedef enum {
-    TIMER_FREE,
-    TIMER_IDLE,
-    TIMER_SCHEDULED,
-    TIMER_PENDING
-} qd_timer_state_t;
-
-
 struct qd_timer_t {
     DEQ_LINKS(qd_timer_t);
     qd_server_t      *server;
     qd_timer_cb_t     handler;
     void             *context;
     qd_timestamp_t    delta_time;
-    qd_timer_state_t  state;
+    bool              scheduled; /* true means on scheduled list, false on idle list */
 };
 
 DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
 
 void qd_timer_initialize(sys_mutex_t *server_lock);
 void qd_timer_finalize(void);
-qd_timestamp_t qd_timer_next_duration_LH(void);
-void qd_timer_visit_LH(qd_timestamp_t current_time);
-void qd_timer_idle_LH(qd_timer_t *timer);
+void qd_timer_visit();
 
 /// For tests only
 sys_mutex_t* qd_timer_lock();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 314ad50..bc62232 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -29,9 +29,9 @@ set(unit_test_SOURCES
     compose_test.c
     policy_test.c
     run_unit_tests.c
-    timer_test.c
     tool_test.c
     failoverlist_test.c
+    timer_test.c
     )
 if (USE_MEMORY_POOL)
   list(APPEND unit_test_SOURCES alloc_test.c)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index e481f13..c8c4ef3 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -24,7 +24,7 @@
 #include <stdio.h>
 
 int tool_tests(void);
-int timer_tests(void);
+int timer_tests(qd_dispatch_t*);
 int alloc_tests(void);
 int compose_tests(void);
 int policy_tests(void);
@@ -52,7 +52,7 @@ int main(int argc, char** argv)
         printf("Config failed: %s\n", qd_error_message());
         return 1;
     }
-    result += timer_tests();
+    result += timer_tests(qd);
     result += tool_tests();
     result += compose_tests();
 #if USE_MEMORY_POOL

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_management.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_management.py b/tests/system_tests_management.py
index b9535a8..c26741f 100644
--- a/tests/system_tests_management.py
+++ b/tests/system_tests_management.py
@@ -169,7 +169,7 @@ class ManagementTest(system_test.TestCase):
         attributes = {'name':'foo', 'port':str(port), 'role':'normal', 'saslMechanisms': 'ANONYMOUS', 'authenticatePeer': False}
         entity = self.assert_create_ok(LISTENER, 'foo', attributes)
         self.assertEqual(entity['name'], 'foo')
-        self.assertEqual(entity['host'], '127.0.0.1')
+        self.assertEqual(entity['host'], '')
 
         # Connect via the new listener
         node3 = self.cleanup(Node.connect(Url(port=port)))

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6f56e289/tests/system_tests_policy.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py
index 2ce3778..120137d 100644
--- a/tests/system_tests_policy.py
+++ b/tests/system_tests_policy.py
@@ -275,66 +275,29 @@ class SenderReceiverLimits(TestCase):
     def test_verify_n_receivers(self):
         n = 4
         addr = self.address()
-
-        # connection should be ok
-        denied = False
-        try:
-            br1 = BlockingConnection(addr)
-        except ConnectionException:
-            denied = True
-
-        self.assertFalse(denied) # assert if connections that should open did not open
+        br1 = BlockingConnection(addr)
 
         # n receivers OK
-        try:
-            r1 = br1.create_receiver(address="****YES_1of4***")
-            r2 = br1.create_receiver(address="****YES_20f4****")
-            r3 = br1.create_receiver(address="****YES_3of4****")
-            r4 = br1.create_receiver(address="****YES_4of4****")
-        except Exception:
-            denied = True
-
-        self.assertFalse(denied) # n receivers should have worked
+        br1.create_receiver(address="****YES_1of4***")
+        br1.create_receiver(address="****YES_20f4****")
+        br1.create_receiver(address="****YES_3of4****")
+        br1.create_receiver(address="****YES_4of4****")
 
         # receiver n+1 should be denied
-        try:
-            r5 = br1.create_receiver("****NO****")
-        except Exception:
-            denied = True
-
-        self.assertTrue(denied) # receiver n+1 should have failed
+        self.assertRaises(LinkDetached, br1.create_receiver, "****NO****")
 
         br1.close()
 
     def test_verify_n_senders(self):
         n = 2
         addr = self.address()
-
-        # connection should be ok
-        denied = False
-        try:
-            bs1 = BlockingConnection(addr)
-        except ConnectionException:
-            denied = True
-
-        self.assertFalse(denied) # assert if connections that should open did not open
+        bs1 = BlockingConnection(addr)
 
         # n senders OK
-        try:
-            s1 = bs1.create_sender(address="****YES_1of2****")
-            s2 = bs1.create_sender(address="****YES_2of2****")
-        except Exception:
-            denied = True
-
-        self.assertFalse(denied) # n senders should have worked
-
-        # receiver n+1 should be denied
-        try:
-            s3 = bs1.create_sender("****NO****")
-        except Exception:
-            denied = True
-
-        self.assertTrue(denied) # sender n+1 should have failed
+        bs1.create_sender(address="****YES_1of2****")
+        bs1.create_sender(address="****YES_2of2****")
+        # sender n+1 should be denied
+        self.assertRaises(LinkDetached, bs1.create_sender, "****NO****")
 
         bs1.close()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org