You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/12/05 21:59:48 UTC

svn commit: r1643428 - in /qpid/dispatch/trunk: include/qpid/dispatch/driver.h src/posix/driver.c

Author: tross
Date: Fri Dec  5 20:59:48 2014
New Revision: 1643428

URL: http://svn.apache.org/r1643428
Log:
DISPATCH-78 - Added files missed from previous commit.

Added:
    qpid/dispatch/trunk/include/qpid/dispatch/driver.h   (with props)
    qpid/dispatch/trunk/src/posix/driver.c   (with props)

Added: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/driver.h?rev=1643428&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/driver.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/driver.h Fri Dec  5 20:59:48 2014
@@ -0,0 +1,393 @@
+#ifndef __dispatch_posix_driver_h__
+#define __dispatch_posix_driver_h__ 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/error.h>
+#include <proton/sasl.h>
+#include <proton/selectable.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <proton/types.h>
+
+/** @file
+ * API for the Driver Layer.
+ *
+ * The driver library provides a simple implementation of a driver for
+ * the proton engine. A driver is responsible for providing input,
+ * output, and tick events to the bottom half of the engine API. See
+ * ::qdpn_transport_input, ::qdpn_transport_output, and
+ * ::qdpn_transport_tick. The driver also provides an interface for the
+ * application to access the top half of the API when the state of the
+ * engine may have changed due to I/O or timing events. Additionally
+ * the driver incorporates the SASL engine as well in order to provide
+ * a complete network stack: AMQP over SASL over TCP.
+ *
+ */
+
+typedef struct qdpn_driver_t qdpn_driver_t;
+typedef struct qdpn_listener_t qdpn_listener_t;
+typedef struct qdpn_connector_t qdpn_connector_t;
+
+typedef enum {
+    QDPN_CONNECTOR_WRITABLE,
+    QDPN_CONNECTOR_READABLE
+} qdpn_activate_criteria_t;
+
+/** Construct a driver
+ *
+ *  Call qdpn_driver_free() to release the driver object.
+ *  @return new driver object, NULL if error
+ */
+qdpn_driver_t *qdpn_driver(void);
+
+/** Return the most recent error code.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+int qdpn_driver_errno(qdpn_driver_t *d);
+
+/** Get additional error information associated with the driver.
+ *
+ * Whenever a driver operation fails, additional error information can
+ * be obtained using this function. The error object that is returned
+ * may also be used to clear the error condition.
+ *
+ * The pointer returned by this operation is valid until the
+ * driver object is freed.
+ *
+ * @param[in] d the driver
+ *
+ * @return the driver's error object
+ */
+pn_error_t *qdpn_driver_error(qdpn_driver_t *d);
+
+/** Set the tracing level for the given driver.
+ *
+ * @param[in] driver the driver to trace
+ * @param[in] trace the trace level to use.
+ * @todo pn_trace_t needs documentation
+ */
+void qdpn_driver_trace(qdpn_driver_t *driver, pn_trace_t trace);
+
+/** Force qdpn_driver_wait() to return
+ *
+ * @param[in] driver the driver to wake up
+ *
+ * @return zero on success, an error code on failure
+ */
+int qdpn_driver_wakeup(qdpn_driver_t *driver);
+
+/** Wait for an active connector or listener
+ *
+ * @param[in] driver the driver to wait on
+ * @param[in] timeout maximum time in milliseconds to wait, -1 means
+ *                    infinite wait
+ *
+ * @return zero on success, an error code on failure
+ */
+int qdpn_driver_wait(qdpn_driver_t *driver, int timeout);
+
+/** Get the next listener with pending data in the driver.
+ *
+ * @param[in] driver the driver
+ * @return NULL if no active listener available
+ */
+qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *driver);
+
+/** Get the next active connector in the driver.
+ *
+ * Returns the next connector with pending inbound data, available
+ * capacity for outbound data, or pending tick.
+ *
+ * @param[in] driver the driver
+ * @return NULL if no active connector available
+ */
+qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *driver);
+
+/** Free the driver allocated via qdpn_driver, and all associated
+ *  listeners and connectors.
+ *
+ * @param[in] driver the driver to free, no longer valid on
+ *                   return
+ */
+void qdpn_driver_free(qdpn_driver_t *driver);
+
+
+/** qdpn_listener - the server API **/
+
+/** Construct a listener for the given address.
+ *
+ * @param[in] driver driver that will 'own' this listener
+ * @param[in] host local host address to listen on
+ * @param[in] port local port to listen on
+ * @param[in] context application-supplied, can be accessed via
+ *                    qdpn_listener_context()
+ * @return a new listener on the given host:port, NULL if error
+ */
+qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, const char *host,
+                               const char *port, void* context);
+
+/** Access the head listener for a driver.
+ *
+ * @param[in] driver the driver whose head listener will be returned
+ *
+ * @return the head listener for driver or NULL if there is none
+ */
+qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver);
+
+/** Access the next listener.
+ *
+ * @param[in] listener the listener whose next listener will be
+ *            returned
+ *
+ * @return the next listener
+ */
+qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener);
+
+/**
+ * @todo qdpn_listener_trace needs documentation
+ */
+void qdpn_listener_trace(qdpn_listener_t *listener, pn_trace_t trace);
+
+/** Accept a connection that is pending on the listener.
+ *
+ * @param[in] listener the listener to accept the connection on
+ * @return a new connector for the remote, or NULL on error
+ */
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener);
+
+/** Access the application context that is associated with the listener.
+ *
+ * @param[in] listener the listener whose context is to be returned
+ * @return the application context that was passed to qdpn_listener() or
+ *         qdpn_listener_fd()
+ */
+void *qdpn_listener_context(qdpn_listener_t *listener);
+
+void qdpn_listener_set_context(qdpn_listener_t *listener, void *context);
+
+/** Close the socket used by the listener.
+ *
+ * @param[in] listener the listener whose socket will be closed.
+ */
+void qdpn_listener_close(qdpn_listener_t *listener);
+
+/** Frees the given listener.
+ *
+ * Assumes the listener's socket has been closed prior to call.
+ *
+ * @param[in] listener the listener object to free, no longer valid
+ *            on return
+ */
+void qdpn_listener_free(qdpn_listener_t *listener);
+
+
+
+
+/** qdpn_connector - the client API **/
+
+/** Construct a connector to the given remote address.
+ *
+ * @param[in] driver owner of this connection.
+ * @param[in] host remote host to connect to.
+ * @param[in] port remote port to connect to.
+ * @param[in] context application supplied, can be accessed via
+ *                    qdpn_connector_context() @return a new connector
+ *                    to the given remote, or NULL on error.
+ */
+qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, const char *host,
+                                 const char *port, void* context);
+
+/** Access the head connector for a driver.
+ *
+ * @param[in] driver the driver whose head connector will be returned
+ *
+ * @return the head connector for driver or NULL if there is none
+ */
+qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver);
+
+/** Access the next connector.
+ *
+ * @param[in] connector the connector whose next connector will be
+ *            returned
+ *
+ * @return the next connector
+ */
+qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector);
+
+/** Set the tracing level for the given connector.
+ *
+ * @param[in] connector the connector to trace
+ * @param[in] trace the trace level to use.
+ */
+void qdpn_connector_trace(qdpn_connector_t *connector, pn_trace_t trace);
+
+/** Service the given connector.
+ *
+ * Handle any inbound data, outbound data, or timing events pending on
+ * the connector.
+ *
+ * @param[in] connector the connector to process.
+ */
+void qdpn_connector_process(qdpn_connector_t *connector);
+
+/** Access the listener which opened this connector.
+ *
+ * @param[in] connector connector whose listener will be returned.
+ * @return the listener which created this connector, or NULL if the
+ *         connector has no listener (e.g. an outbound client
+ *         connection)
+ */
+qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *connector);
+
+/** Access the Authentication and Security context of the connector.
+ *
+ * @param[in] connector connector whose security context will be
+ *                      returned
+ * @return the Authentication and Security context for the connector,
+ *         or NULL if none
+ */
+pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *connector);
+
+/** Access the AMQP Connection associated with the connector.
+ *
+ * @param[in] connector the connector whose connection will be
+ *                      returned
+ * @return the connection context for the connector, or NULL if none
+ */
+pn_connection_t *qdpn_connector_connection(qdpn_connector_t *connector);
+
+/** Assign the AMQP Connection associated with the connector.
+ *
+ * @param[in] connector the connector whose connection will be set.
+ * @param[in] connection the connection to associate with the
+ *                       connector
+ */
+void qdpn_connector_set_connection(qdpn_connector_t *connector, pn_connection_t *connection);
+
+/** Access the application context that is associated with the
+ *  connector.
+ *
+ * @param[in] connector the connector whose context is to be returned.
+ * @return the application context that was passed to qdpn_connector()
+ *         or qdpn_connector_fd()
+ */
+void *qdpn_connector_context(qdpn_connector_t *connector);
+
+/** Assign a new application context to the connector.
+ *
+ * @param[in] connector the connector which will hold the context.
+ * @param[in] context new application context to associate with the
+ *                    connector
+ */
+void qdpn_connector_set_context(qdpn_connector_t *connector, void *context);
+
+/** Access the name of the connector
+ *
+ * @param[in] connector the connector which will hole the name
+ * @return the name of the connector in the form of a null-terminated character string.
+ */
+const char *qdpn_connector_name(const qdpn_connector_t *connector);
+
+/** Access the transport used by this connector.
+ *
+ * @param[in] connector connector whose transport will be returned
+ * @return the transport, or NULL if none
+ */
+pn_transport_t *qdpn_connector_transport(qdpn_connector_t *connector);
+
+/** Close the socket used by the connector.
+ *
+ * @param[in] connector the connector whose socket will be closed
+ */
+void qdpn_connector_close(qdpn_connector_t *connector);
+
+/** Determine if the connector is closed.
+ *
+ * @return True if closed, otherwise false
+ */
+bool qdpn_connector_closed(qdpn_connector_t *connector);
+
+/** Destructor for the given connector.
+ *
+ * Assumes the connector's socket has been closed prior to call.
+ *
+ * @param[in] connector the connector object to free. No longer
+ *                      valid on return
+ */
+void qdpn_connector_free(qdpn_connector_t *connector);
+
+/** Activate a connector when a criteria is met
+ *
+ * Set a criteria for a connector (i.e. it's transport is writable) that, once met,
+ * the connector shall be placed in the driver's work queue.
+ *
+ * @param[in] connector The connector object to activate
+ * @param[in] criteria  The criteria that must be met prior to activating the connector
+ */
+void qdpn_connector_activate(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
+
+/** Return the activation status of the connector for a criteria
+ *
+ * Return the activation status (i.e. readable, writable) for the connector.  This function
+ * has the side-effect of canceling the activation of the criteria.
+ *
+ * Please note that this function must not be used for normal AMQP connectors.  It is only
+ * used for connectors created so the driver can track non-AMQP file descriptors.  Such
+ * connectors are never passed into qdpn_connector_process.
+ *
+ * @param[in] connector The connector object to activate
+ * @param[in] criteria  The criteria to test.  "Is this the reason the connector appeared
+ *                      in the work list?"
+ * @return true iff the criteria is activated on the connector.
+ */
+bool qdpn_connector_activated(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
+
+/** Create a listener using the existing file descriptor.
+ *
+ * @param[in] driver driver that will 'own' this listener
+ * @param[in] fd existing socket for listener to listen on
+ * @param[in] context application-supplied, can be accessed via
+ *                    qdpn_listener_context()
+ * @return a new listener on the given host:port, NULL if error
+ */
+qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
+
+pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener);
+
+/** Create a connector using the existing file descriptor.
+ *
+ * @param[in] driver driver that will 'own' this connector.
+ * @param[in] fd existing socket to use for this connector.
+ * @param[in] context application-supplied, can be accessed via
+ *                    qdpn_connector_context()
+ * @return a new connector to the given host:port, NULL if error.
+ */
+qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
+
+pn_socket_t qdpn_connector_get_fd(qdpn_connector_t *connector);
+
+
+#endif /* driver.h */

Propchange: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Rev URL

Added: qpid/dispatch/trunk/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/posix/driver.c?rev=1643428&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/posix/driver.c (added)
+++ qpid/dispatch/trunk/src/posix/driver.c Fri Dec  5 20:59:48 2014
@@ -0,0 +1,857 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <assert.h>
+#include <poll.h>
+#include <stdio.h>
+#include <ctype.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+
+#include <qpid/dispatch/driver.h>
+//#include <proton/driver_extras.h>
+#include <proton/error.h>
+#include <proton/io.h>
+#include <proton/sasl.h>
+#include <proton/ssl.h>
+#include <proton/object.h>
+#include <qpid/dispatch/ctools.h>
+//#include "util.h"
+//#include "platform.h"
+
+/* Decls */
+
+#define PN_SEL_RD (0x0001)
+#define PN_SEL_WR (0x0002)
+
+DEQ_DECLARE(qdpn_listener_t, qdpn_listener_list_t);
+DEQ_DECLARE(qdpn_connector_t, qdpn_connector_list_t);
+
+struct qdpn_driver_t {
+    pn_error_t *error;
+    pn_io_t *io;
+    qdpn_listener_list_t listeners;
+    qdpn_connector_list_t connectors;
+    qdpn_listener_t *listener_next;
+    qdpn_connector_t *connector_next;
+    size_t closed_count;
+    size_t capacity;
+    struct pollfd *fds;
+    size_t nfds;
+    int ctrl[2]; //pipe for updating selectable status
+    pn_timestamp_t wakeup;
+    pn_trace_t trace;
+};
+
+struct qdpn_listener_t {
+    DEQ_LINKS(qdpn_listener_t);
+    qdpn_driver_t *driver;
+    void *context;
+    int idx;
+    int fd;
+    bool pending;
+    bool closed;
+};
+
+#define PN_NAME_MAX (256)
+
+struct qdpn_connector_t {
+    DEQ_LINKS(qdpn_connector_t);
+    qdpn_driver_t *driver;
+    char name[PN_NAME_MAX];
+    pn_timestamp_t wakeup;
+    pn_connection_t *connection;
+    pn_transport_t *transport;
+    pn_sasl_t *sasl;
+    qdpn_listener_t *listener;
+    void *context;
+    int idx;
+    int fd;
+    int status;
+    pn_trace_t trace;
+    bool pending_tick;
+    bool pending_read;
+    bool pending_write;
+    bool closed;
+    bool input_done;
+    bool output_done;
+};
+
+/* Impls */
+
+static void pni_fatal(const char *text)
+{
+    fprintf(stderr, "%s\n", text);
+    exit(1);
+}
+
+#ifdef USE_CLOCK_GETTIME
+#include <time.h>
+pn_timestamp_t pn_i_now(void)
+{
+    struct timespec now;
+    if (clock_gettime(CLOCK_REALTIME, &now)) pni_fatal("clock_gettime() failed");
+    return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
+}
+#elif defined(USE_WIN_FILETIME)
+#include <windows.h>
+pn_timestamp_t pn_i_now(void)
+{
+    FILETIME now;
+    GetSystemTimeAsFileTime(&now);
+    ULARGE_INTEGER t;
+    t.u.HighPart = now.dwHighDateTime;
+    t.u.LowPart = now.dwLowDateTime;
+    // Convert to milliseconds and adjust base epoch
+    return t.QuadPart / 10000 - 11644473600000;
+}
+#else
+#include <sys/time.h>
+pn_timestamp_t pn_i_now(void)
+{
+    struct timeval now;
+    if (gettimeofday(&now, NULL)) pni_fatal("gettimeofday failed");
+    return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}
+#endif
+
+static bool pni_eq_nocase(const char *a, const char *b)
+{
+    while (*b) {
+        if (tolower(*a++) != tolower(*b++))
+            return false;
+    }
+    return !(*a);
+}
+
+static bool pn_env_bool(const char *name)
+{
+    char *v = getenv(name);
+    return v && (pni_eq_nocase(v, "true") || pni_eq_nocase(v, "1") ||
+                 pni_eq_nocase(v, "yes") || pni_eq_nocase(v, "on"));
+}
+
+#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
+#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
+
+static pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b )
+{
+    if (a && b) return pn_min(a, b);
+    if (a) return a;
+    return b;
+}
+
+static int pn_i_error_from_errno(pn_error_t *error, const char *msg)
+{
+  char err[1024];
+  strerror_r(errno, err, 1024);
+  int code = PN_ERR;
+  if (errno == EINTR)
+      code = PN_INTR;
+  return pn_error_format(error, code, "%s: %s", msg, err);
+}
+
+
+// listener
+
+static void qdpn_driver_add_listener(qdpn_driver_t *d, qdpn_listener_t *l)
+{
+    if (!l->driver) return;
+    DEQ_INSERT_TAIL(d->listeners, l);
+    l->driver = d;
+}
+
+static void qdpn_driver_remove_listener(qdpn_driver_t *d, qdpn_listener_t *l)
+{
+    if (!l->driver) return;
+
+    if (l == d->listener_next) {
+        d->listener_next = DEQ_NEXT(l);
+    }
+
+    DEQ_REMOVE(d->listeners, l);
+    l->driver = NULL;
+}
+
+qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, const char *host,
+                               const char *port, void* context)
+{
+    if (!driver) return NULL;
+
+    pn_socket_t sock = pn_listen(driver->io, host, port);
+    if (sock == PN_INVALID_SOCKET) {
+        return NULL;
+    } else {
+        qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context);
+
+        if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+            fprintf(stderr, "Listening on %s:%s\n", host, port);
+
+        return l;
+    }
+}
+
+qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, int fd, void *context)
+{
+    if (!driver) return NULL;
+
+    qdpn_listener_t *l = (qdpn_listener_t *) malloc(sizeof(qdpn_listener_t));
+    if (!l) return NULL;
+    DEQ_ITEM_INIT(l);
+    l->driver = driver;
+    l->idx = 0;
+    l->pending = false;
+    l->fd = fd;
+    l->closed = false;
+    l->context = context;
+
+    qdpn_driver_add_listener(driver, l);
+    return l;
+}
+
+pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener)
+{
+    assert(listener);
+    return listener->fd;
+}
+
+qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver)
+{
+    return driver ? DEQ_HEAD(driver->listeners) : NULL;
+}
+
+qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener)
+{
+    return listener ? DEQ_NEXT(listener) : NULL;
+}
+
+void qdpn_listener_trace(qdpn_listener_t *l, pn_trace_t trace)
+{
+    // XXX
+}
+
+void *qdpn_listener_context(qdpn_listener_t *l)
+{
+    return l ? l->context : NULL;
+}
+
+void qdpn_listener_set_context(qdpn_listener_t *listener, void *context)
+{
+    assert(listener);
+    listener->context = context;
+}
+
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
+{
+    if (!l || !l->pending) return NULL;
+    char name[PN_NAME_MAX];
+
+    pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX);
+    if (sock == PN_INVALID_SOCKET) {
+        return NULL;
+    } else {
+        if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+            fprintf(stderr, "Accepted from %s\n", name);
+        qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
+        snprintf(c->name, PN_NAME_MAX, "%s", name);
+        c->listener = l;
+        return c;
+    }
+}
+
+void qdpn_listener_close(qdpn_listener_t *l)
+{
+    if (!l) return;
+    if (l->closed) return;
+
+    if (close(l->fd) == -1)
+        perror("close");
+    l->closed = true;
+}
+
+void qdpn_listener_free(qdpn_listener_t *l)
+{
+    if (!l) return;
+
+    if (l->driver) qdpn_driver_remove_listener(l->driver, l);
+    free(l);
+}
+
+// connector
+
+static void qdpn_driver_add_connector(qdpn_driver_t *d, qdpn_connector_t *c)
+{
+    if (!c->driver) return;
+    DEQ_INSERT_TAIL(d->connectors, c);
+    c->driver = d;
+}
+
+static void qdpn_driver_remove_connector(qdpn_driver_t *d, qdpn_connector_t *c)
+{
+    if (!c->driver) return;
+
+    if (c == d->connector_next) {
+        d->connector_next = DEQ_NEXT(c);
+    }
+
+    DEQ_REMOVE(d->connectors, c);
+    c->driver = NULL;
+    if (c->closed) {
+        d->closed_count--;
+    }
+}
+
+qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, const char *host,
+                                 const char *port, void *context)
+{
+    if (!driver) return NULL;
+
+    pn_socket_t sock = pn_connect(driver->io, host, port);
+
+    qdpn_connector_t *c = qdpn_connector_fd(driver, sock, context);
+    snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
+    if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+        fprintf(stderr, "Connected to %s\n", c->name);
+    return c;
+}
+
+qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context)
+{
+    if (!driver) return NULL;
+
+    qdpn_connector_t *c = (qdpn_connector_t *) malloc(sizeof(qdpn_connector_t));
+    if (!c) return NULL;
+    DEQ_ITEM_INIT(c);
+    c->driver = driver;
+    c->pending_tick = false;
+    c->pending_read = false;
+    c->pending_write = false;
+    c->name[0] = '\0';
+    c->idx = 0;
+    c->fd = fd;
+    c->status = PN_SEL_RD | PN_SEL_WR;
+    c->trace = driver->trace;
+    c->closed = false;
+    c->wakeup = 0;
+    c->connection = NULL;
+    c->transport = pn_transport();
+    c->sasl = pn_sasl(c->transport);
+    c->input_done = false;
+    c->output_done = false;
+    c->context = context;
+    c->listener = NULL;
+
+    qdpn_connector_trace(c, driver->trace);
+
+    qdpn_driver_add_connector(driver, c);
+    return c;
+}
+
+pn_socket_t qdpn_connector_get_fd(qdpn_connector_t *connector)
+{
+    assert(connector);
+    return connector->fd;
+}
+
+qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver)
+{
+    return driver ? DEQ_HEAD(driver->connectors) : NULL;
+}
+
+qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector)
+{
+    return connector ? DEQ_NEXT(connector) : NULL;
+}
+
+void qdpn_connector_trace(qdpn_connector_t *ctor, pn_trace_t trace)
+{
+    if (!ctor) return;
+    ctor->trace = trace;
+    if (ctor->transport) pn_transport_trace(ctor->transport, trace);
+}
+
+pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->sasl : NULL;
+}
+
+pn_transport_t *qdpn_connector_transport(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->transport : NULL;
+}
+
+void qdpn_connector_set_connection(qdpn_connector_t *ctor, pn_connection_t *connection)
+{
+    if (!ctor) return;
+    if (ctor->connection) {
+        pn_class_decref(PN_OBJECT, ctor->connection);
+        pn_transport_unbind(ctor->transport);
+    }
+    ctor->connection = connection;
+    if (ctor->connection) {
+        pn_class_incref(PN_OBJECT, ctor->connection);
+        pn_transport_bind(ctor->transport, connection);
+    }
+    if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace);
+}
+
+pn_connection_t *qdpn_connector_connection(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->connection : NULL;
+}
+
+void *qdpn_connector_context(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->context : NULL;
+}
+
+void qdpn_connector_set_context(qdpn_connector_t *ctor, void *context)
+{
+    if (!ctor) return;
+    ctor->context = context;
+}
+
+const char *qdpn_connector_name(const qdpn_connector_t *ctor)
+{
+    if (!ctor) return 0;
+    return ctor->name;
+}
+
+qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->listener : NULL;
+}
+
+void qdpn_connector_close(qdpn_connector_t *ctor)
+{
+    // XXX: should probably signal engine and callback here
+    if (!ctor) return;
+
+    ctor->status = 0;
+    if (close(ctor->fd) == -1)
+        perror("close");
+    ctor->closed = true;
+    ctor->driver->closed_count++;
+}
+
+bool qdpn_connector_closed(qdpn_connector_t *ctor)
+{
+    return ctor ? ctor->closed : true;
+}
+
+void qdpn_connector_free(qdpn_connector_t *ctor)
+{
+    if (!ctor) return;
+
+    if (ctor->driver) qdpn_driver_remove_connector(ctor->driver, ctor);
+    pn_transport_free(ctor->transport);
+    ctor->transport = NULL;
+    if (ctor->connection) pn_class_decref(PN_OBJECT, ctor->connection);
+    ctor->connection = NULL;
+    free(ctor);
+}
+
+void qdpn_connector_activate(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
+{
+    switch (crit) {
+    case QDPN_CONNECTOR_WRITABLE :
+        ctor->status |= PN_SEL_WR;
+        break;
+
+    case QDPN_CONNECTOR_READABLE :
+        ctor->status |= PN_SEL_RD;
+        break;
+    }
+}
+
+
+bool qdpn_connector_activated(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
+{
+    bool result = false;
+
+    switch (crit) {
+    case QDPN_CONNECTOR_WRITABLE :
+        result = ctor->pending_write;
+        ctor->pending_write = false;
+        ctor->status &= ~PN_SEL_WR;
+        break;
+
+    case QDPN_CONNECTOR_READABLE :
+        result = ctor->pending_read;
+        ctor->pending_read = false;
+        ctor->status &= ~PN_SEL_RD;
+        break;
+    }
+
+    return result;
+}
+
+static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t now)
+{
+    if (!ctor->transport) return 0;
+    return pn_transport_tick(ctor->transport, now);
+}
+
+void qdpn_connector_process(qdpn_connector_t *c)
+{
+    if (c) {
+        if (c->closed) return;
+
+        pn_transport_t *transport = c->transport;
+
+        ///
+        /// Socket read
+        ///
+        if (!c->input_done) {
+            ssize_t capacity = pn_transport_capacity(transport);
+            if (capacity > 0) {
+                c->status |= PN_SEL_RD;
+                if (c->pending_read) {
+                    c->pending_read = false;
+                    ssize_t n =  pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity);
+                    if (n < 0) {
+                        if (errno != EAGAIN) {
+                            perror("read");
+                            c->status &= ~PN_SEL_RD;
+                            c->input_done = true;
+                            pn_transport_close_tail( transport );
+                        }
+                    } else if (n == 0) {
+                        c->status &= ~PN_SEL_RD;
+                        c->input_done = true;
+                        pn_transport_close_tail( transport );
+                    } else {
+                        if (pn_transport_process(transport, (size_t) n) < 0) {
+                            c->status &= ~PN_SEL_RD;
+                            c->input_done = true;
+                        }
+                    }
+                }
+            }
+
+            capacity = pn_transport_capacity(transport);
+
+            if (capacity < 0) {
+                c->status &= ~PN_SEL_RD;
+                c->input_done = true;
+            }
+        }
+
+        ///
+        /// Event wakeup
+        ///
+        c->wakeup = qdpn_connector_tick(c, pn_i_now());
+
+        ///
+        /// Socket write
+        ///
+        if (!c->output_done) {
+            ssize_t pending = pn_transport_pending(transport);
+            if (pending > 0) {
+                c->status |= PN_SEL_WR;
+                if (c->pending_write) {
+                    c->pending_write = false;
+                    ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending);
+                    if (n < 0) {
+                        // XXX
+                        if (errno != EAGAIN) {
+                            perror("send");
+                            c->output_done = true;
+                            c->status &= ~PN_SEL_WR;
+                            pn_transport_close_head( transport );
+                        }
+                    } else if (n) {
+                        pn_transport_pop(transport, (size_t) n);
+                    }
+                }
+            } else if (pending == 0) {
+                c->status &= ~PN_SEL_WR;
+            } else {
+                c->output_done = true;
+                c->status &= ~PN_SEL_WR;
+            }
+        }
+
+        // Closed?
+
+        if (c->input_done && c->output_done) {
+            if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+                fprintf(stderr, "Closed %s\n", c->name);
+            }
+            qdpn_connector_close(c);
+        }
+    }
+}
+
+// driver
+
+qdpn_driver_t *qdpn_driver()
+{
+    qdpn_driver_t *d = (qdpn_driver_t *) malloc(sizeof(qdpn_driver_t));
+    if (!d) return NULL;
+    DEQ_INIT(d->listeners);
+    DEQ_INIT(d->connectors);
+    d->error = pn_error();
+    d->io = pn_io();
+    d->listener_next = NULL;
+    d->connector_next = NULL;
+    d->closed_count = 0;
+    d->capacity = 0;
+    d->fds = NULL;
+    d->nfds = 0;
+    d->ctrl[0] = 0;
+    d->ctrl[1] = 0;
+    d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
+                (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+                (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
+    d->wakeup = 0;
+
+    // XXX
+    if (pipe(d->ctrl)) {
+        perror("Can't create control pipe");
+    }
+
+    return d;
+}
+
+int qdpn_driver_errno(qdpn_driver_t *d)
+{
+    assert(d);
+    return pn_error_code(d->error);
+}
+
+pn_error_t *qdpn_driver_error(qdpn_driver_t *d)
+{
+    assert(d);
+    return d->error;
+}
+
+void qdpn_driver_trace(qdpn_driver_t *d, pn_trace_t trace)
+{
+    d->trace = trace;
+}
+
+void qdpn_driver_free(qdpn_driver_t *d)
+{
+    if (!d) return;
+
+    close(d->ctrl[0]);
+    close(d->ctrl[1]);
+    while (DEQ_HEAD(d->connectors))
+        qdpn_connector_free(DEQ_HEAD(d->connectors));
+    while (DEQ_HEAD(d->listeners))
+        qdpn_listener_free(DEQ_HEAD(d->listeners));
+    free(d->fds);
+    pn_error_free(d->error);
+    pn_io_free(d->io);
+    free(d);
+}
+
+int qdpn_driver_wakeup(qdpn_driver_t *d)
+{
+    if (d) {
+        ssize_t count = write(d->ctrl[1], "x", 1);
+        if (count <= 0) {
+            return count;
+        } else {
+            return 0;
+        }
+    } else {
+        return PN_ARG_ERR;
+    }
+}
+
+static void qdpn_driver_rebuild(qdpn_driver_t *d)
+{
+    size_t size = DEQ_SIZE(d->listeners) + DEQ_SIZE(d->connectors);
+    while (d->capacity < size + 1) {
+        d->capacity = d->capacity ? 2*d->capacity : 16;
+        d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd));
+    }
+
+    d->wakeup = 0;
+    d->nfds = 0;
+
+    d->fds[d->nfds].fd = d->ctrl[0];
+    d->fds[d->nfds].events = POLLIN;
+    d->fds[d->nfds].revents = 0;
+    d->nfds++;
+
+    qdpn_listener_t *l = DEQ_HEAD(d->listeners);
+    while (l) {
+        d->fds[d->nfds].fd = l->fd;
+        d->fds[d->nfds].events = POLLIN;
+        d->fds[d->nfds].revents = 0;
+        l->idx = d->nfds;
+        d->nfds++;
+        l = DEQ_NEXT(l);
+    }
+
+    qdpn_connector_t *c = DEQ_HEAD(d->connectors);
+    while (c) {
+        if (!c->closed) {
+            d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
+            d->fds[d->nfds].fd = c->fd;
+            d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
+            d->fds[d->nfds].revents = 0;
+            c->idx = d->nfds;
+            d->nfds++;
+        }
+        c = DEQ_NEXT(c);
+    }
+}
+
+void qdpn_driver_wait_1(qdpn_driver_t *d)
+{
+    qdpn_driver_rebuild(d);
+}
+
+int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout)
+{
+    if (d->wakeup) {
+        pn_timestamp_t now = pn_i_now();
+        if (now >= d->wakeup)
+            timeout = 0;
+        else
+            timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now);
+    }
+    int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout);
+    if (result == -1)
+        pn_i_error_from_errno(d->error, "poll");
+    return result;
+}
+
+int qdpn_driver_wait_3(qdpn_driver_t *d)
+{
+    bool woken = false;
+    if (d->fds[0].revents & POLLIN) {
+        woken = true;
+        //clear the pipe
+        char buffer[512];
+        while (read(d->ctrl[0], buffer, 512) == 512);
+    }
+
+    qdpn_listener_t *l = DEQ_HEAD(d->listeners);
+    while (l) {
+        l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
+        l = DEQ_NEXT(l);
+    }
+
+    pn_timestamp_t now = pn_i_now();
+    qdpn_connector_t *c = DEQ_HEAD(d->connectors);
+    while (c) {
+        if (c->closed) {
+            c->pending_read = false;
+            c->pending_write = false;
+            c->pending_tick = false;
+        } else {
+            int idx = c->idx;
+            c->pending_read = (idx && d->fds[idx].revents & POLLIN);
+            c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+            c->pending_tick = (c->wakeup &&  c->wakeup <= now);
+            if (idx && d->fds[idx].revents & POLLERR)
+                qdpn_connector_close(c);
+            else if (idx && (d->fds[idx].revents & POLLHUP)) {
+                if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+                    fprintf(stderr, "hangup on connector %s\n", c->name);
+                }
+                /* poll() is signalling POLLHUP. to see what happened we need
+                 * to do an actual recv() to get the error code. But we might
+                 * be in a state where we're not interested in input, in that
+                 * case try to get the error code via send() */
+                if (d->fds[idx].events & POLLIN)
+                    c->pending_read = true;
+                else if (d->fds[idx].events & POLLOUT)
+                    c->pending_write = true;
+            } else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
+                if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+                    fprintf(stderr, "Unexpected poll events: %04x on %s\n",
+                            d->fds[idx].revents, c->name);
+                }
+            }
+        }
+        c = DEQ_NEXT(c);
+    }
+
+    d->listener_next = DEQ_HEAD(d->listeners);
+    d->connector_next = DEQ_HEAD(d->connectors);
+
+    return woken ? PN_INTR : 0;
+}
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+//       temporary workaround for a multi-threading problem.  A multi-threaded
+//       application must hold a lock on parts 1 and 3, but not on part 2.
+//       This temporary change, which is not reflected in the driver's API, allows
+//       a multi-threaded application to use the three parts separately.
+//
+//       This workaround will eventually be replaced by a more elegant solution
+//       to the problem.
+//
+int qdpn_driver_wait(qdpn_driver_t *d, int timeout)
+{
+    qdpn_driver_wait_1(d);
+    int result = qdpn_driver_wait_2(d, timeout);
+    if (result == -1)
+        return pn_error_code(d->error);
+    return qdpn_driver_wait_3(d);
+}
+
+qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *d)
+{
+    if (!d) return NULL;
+
+    while (d->listener_next) {
+        qdpn_listener_t *l = d->listener_next;
+        d->listener_next = DEQ_NEXT(l);
+
+        if (l->pending) {
+            return l;
+        }
+    }
+
+    return NULL;
+}
+
+qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *d)
+{
+    if (!d) return NULL;
+
+    while (d->connector_next) {
+        qdpn_connector_t *c = d->connector_next;
+        d->connector_next = DEQ_NEXT(c);
+
+        if (c->closed || c->pending_read || c->pending_write || c->pending_tick) {
+            return c;
+        }
+    }
+
+    return NULL;
+}
+

Propchange: qpid/dispatch/trunk/src/posix/driver.c
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org