You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/12/05 21:59:48 UTC
svn commit: r1643428 - in /qpid/dispatch/trunk:
include/qpid/dispatch/driver.h src/posix/driver.c
Author: tross
Date: Fri Dec 5 20:59:48 2014
New Revision: 1643428
URL: http://svn.apache.org/r1643428
Log:
DISPATCH-78 - Added files missed from previous commit.
Added:
qpid/dispatch/trunk/include/qpid/dispatch/driver.h (with props)
qpid/dispatch/trunk/src/posix/driver.c (with props)
Added: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/driver.h?rev=1643428&view=auto
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/driver.h (added)
+++ qpid/dispatch/trunk/include/qpid/dispatch/driver.h Fri Dec 5 20:59:48 2014
@@ -0,0 +1,393 @@
+#ifndef __dispatch_posix_driver_h__
+#define __dispatch_posix_driver_h__ 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/error.h>
+#include <proton/sasl.h>
+#include <proton/selectable.h>
+#include <proton/ssl.h>
+#include <proton/transport.h>
+#include <proton/types.h>
+
+/** @file
+ * API for the Driver Layer.
+ *
+ * The driver library provides a simple implementation of a driver for
+ * the proton engine. A driver is responsible for providing input,
+ * output, and tick events to the bottom half of the engine API. See
+ * ::qdpn_transport_input, ::qdpn_transport_output, and
+ * ::qdpn_transport_tick. The driver also provides an interface for the
+ * application to access the top half of the API when the state of the
+ * engine may have changed due to I/O or timing events. Additionally
+ * the driver incorporates the SASL engine as well in order to provide
+ * a complete network stack: AMQP over SASL over TCP.
+ *
+ */
+
+typedef struct qdpn_driver_t qdpn_driver_t;
+typedef struct qdpn_listener_t qdpn_listener_t;
+typedef struct qdpn_connector_t qdpn_connector_t;
+
+typedef enum {
+ QDPN_CONNECTOR_WRITABLE,
+ QDPN_CONNECTOR_READABLE
+} qdpn_activate_criteria_t;
+
+/** Construct a driver
+ *
+ * Call qdpn_driver_free() to release the driver object.
+ * @return new driver object, NULL if error
+ */
+qdpn_driver_t *qdpn_driver(void);
+
+/** Return the most recent error code.
+ *
+ * @param[in] d the driver
+ *
+ * @return the most recent error text for d
+ */
+int qdpn_driver_errno(qdpn_driver_t *d);
+
+/** Get additional error information associated with the driver.
+ *
+ * Whenever a driver operation fails, additional error information can
+ * be obtained using this function. The error object that is returned
+ * may also be used to clear the error condition.
+ *
+ * The pointer returned by this operation is valid until the
+ * driver object is freed.
+ *
+ * @param[in] d the driver
+ *
+ * @return the driver's error object
+ */
+pn_error_t *qdpn_driver_error(qdpn_driver_t *d);
+
+/** Set the tracing level for the given driver.
+ *
+ * @param[in] driver the driver to trace
+ * @param[in] trace the trace level to use.
+ * @todo pn_trace_t needs documentation
+ */
+void qdpn_driver_trace(qdpn_driver_t *driver, pn_trace_t trace);
+
+/** Force qdpn_driver_wait() to return
+ *
+ * @param[in] driver the driver to wake up
+ *
+ * @return zero on success, an error code on failure
+ */
+int qdpn_driver_wakeup(qdpn_driver_t *driver);
+
+/** Wait for an active connector or listener
+ *
+ * @param[in] driver the driver to wait on
+ * @param[in] timeout maximum time in milliseconds to wait, -1 means
+ * infinite wait
+ *
+ * @return zero on success, an error code on failure
+ */
+int qdpn_driver_wait(qdpn_driver_t *driver, int timeout);
+
+/** Get the next listener with pending data in the driver.
+ *
+ * @param[in] driver the driver
+ * @return NULL if no active listener available
+ */
+qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *driver);
+
+/** Get the next active connector in the driver.
+ *
+ * Returns the next connector with pending inbound data, available
+ * capacity for outbound data, or pending tick.
+ *
+ * @param[in] driver the driver
+ * @return NULL if no active connector available
+ */
+qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *driver);
+
+/** Free the driver allocated via qdpn_driver, and all associated
+ * listeners and connectors.
+ *
+ * @param[in] driver the driver to free, no longer valid on
+ * return
+ */
+void qdpn_driver_free(qdpn_driver_t *driver);
+
+
+/** qdpn_listener - the server API **/
+
+/** Construct a listener for the given address.
+ *
+ * @param[in] driver driver that will 'own' this listener
+ * @param[in] host local host address to listen on
+ * @param[in] port local port to listen on
+ * @param[in] context application-supplied, can be accessed via
+ * qdpn_listener_context()
+ * @return a new listener on the given host:port, NULL if error
+ */
+qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, const char *host,
+ const char *port, void* context);
+
+/** Access the head listener for a driver.
+ *
+ * @param[in] driver the driver whose head listener will be returned
+ *
+ * @return the head listener for driver or NULL if there is none
+ */
+qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver);
+
+/** Access the next listener.
+ *
+ * @param[in] listener the listener whose next listener will be
+ * returned
+ *
+ * @return the next listener
+ */
+qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener);
+
+/**
+ * @todo qdpn_listener_trace needs documentation
+ */
+void qdpn_listener_trace(qdpn_listener_t *listener, pn_trace_t trace);
+
+/** Accept a connection that is pending on the listener.
+ *
+ * @param[in] listener the listener to accept the connection on
+ * @return a new connector for the remote, or NULL on error
+ */
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *listener);
+
+/** Access the application context that is associated with the listener.
+ *
+ * @param[in] listener the listener whose context is to be returned
+ * @return the application context that was passed to qdpn_listener() or
+ * qdpn_listener_fd()
+ */
+void *qdpn_listener_context(qdpn_listener_t *listener);
+
+void qdpn_listener_set_context(qdpn_listener_t *listener, void *context);
+
+/** Close the socket used by the listener.
+ *
+ * @param[in] listener the listener whose socket will be closed.
+ */
+void qdpn_listener_close(qdpn_listener_t *listener);
+
+/** Frees the given listener.
+ *
+ * Assumes the listener's socket has been closed prior to call.
+ *
+ * @param[in] listener the listener object to free, no longer valid
+ * on return
+ */
+void qdpn_listener_free(qdpn_listener_t *listener);
+
+
+
+
+/** qdpn_connector - the client API **/
+
+/** Construct a connector to the given remote address.
+ *
+ * @param[in] driver owner of this connection.
+ * @param[in] host remote host to connect to.
+ * @param[in] port remote port to connect to.
+ * @param[in] context application supplied, can be accessed via
+ * qdpn_connector_context() @return a new connector
+ * to the given remote, or NULL on error.
+ */
+qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, const char *host,
+ const char *port, void* context);
+
+/** Access the head connector for a driver.
+ *
+ * @param[in] driver the driver whose head connector will be returned
+ *
+ * @return the head connector for driver or NULL if there is none
+ */
+qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver);
+
+/** Access the next connector.
+ *
+ * @param[in] connector the connector whose next connector will be
+ * returned
+ *
+ * @return the next connector
+ */
+qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector);
+
+/** Set the tracing level for the given connector.
+ *
+ * @param[in] connector the connector to trace
+ * @param[in] trace the trace level to use.
+ */
+void qdpn_connector_trace(qdpn_connector_t *connector, pn_trace_t trace);
+
+/** Service the given connector.
+ *
+ * Handle any inbound data, outbound data, or timing events pending on
+ * the connector.
+ *
+ * @param[in] connector the connector to process.
+ */
+void qdpn_connector_process(qdpn_connector_t *connector);
+
+/** Access the listener which opened this connector.
+ *
+ * @param[in] connector connector whose listener will be returned.
+ * @return the listener which created this connector, or NULL if the
+ * connector has no listener (e.g. an outbound client
+ * connection)
+ */
+qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *connector);
+
+/** Access the Authentication and Security context of the connector.
+ *
+ * @param[in] connector connector whose security context will be
+ * returned
+ * @return the Authentication and Security context for the connector,
+ * or NULL if none
+ */
+pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *connector);
+
+/** Access the AMQP Connection associated with the connector.
+ *
+ * @param[in] connector the connector whose connection will be
+ * returned
+ * @return the connection context for the connector, or NULL if none
+ */
+pn_connection_t *qdpn_connector_connection(qdpn_connector_t *connector);
+
+/** Assign the AMQP Connection associated with the connector.
+ *
+ * @param[in] connector the connector whose connection will be set.
+ * @param[in] connection the connection to associate with the
+ * connector
+ */
+void qdpn_connector_set_connection(qdpn_connector_t *connector, pn_connection_t *connection);
+
+/** Access the application context that is associated with the
+ * connector.
+ *
+ * @param[in] connector the connector whose context is to be returned.
+ * @return the application context that was passed to qdpn_connector()
+ * or qdpn_connector_fd()
+ */
+void *qdpn_connector_context(qdpn_connector_t *connector);
+
+/** Assign a new application context to the connector.
+ *
+ * @param[in] connector the connector which will hold the context.
+ * @param[in] context new application context to associate with the
+ * connector
+ */
+void qdpn_connector_set_context(qdpn_connector_t *connector, void *context);
+
+/** Access the name of the connector
+ *
+ * @param[in] connector the connector which will hole the name
+ * @return the name of the connector in the form of a null-terminated character string.
+ */
+const char *qdpn_connector_name(const qdpn_connector_t *connector);
+
+/** Access the transport used by this connector.
+ *
+ * @param[in] connector connector whose transport will be returned
+ * @return the transport, or NULL if none
+ */
+pn_transport_t *qdpn_connector_transport(qdpn_connector_t *connector);
+
+/** Close the socket used by the connector.
+ *
+ * @param[in] connector the connector whose socket will be closed
+ */
+void qdpn_connector_close(qdpn_connector_t *connector);
+
+/** Determine if the connector is closed.
+ *
+ * @return True if closed, otherwise false
+ */
+bool qdpn_connector_closed(qdpn_connector_t *connector);
+
+/** Destructor for the given connector.
+ *
+ * Assumes the connector's socket has been closed prior to call.
+ *
+ * @param[in] connector the connector object to free. No longer
+ * valid on return
+ */
+void qdpn_connector_free(qdpn_connector_t *connector);
+
+/** Activate a connector when a criteria is met
+ *
+ * Set a criteria for a connector (i.e. it's transport is writable) that, once met,
+ * the connector shall be placed in the driver's work queue.
+ *
+ * @param[in] connector The connector object to activate
+ * @param[in] criteria The criteria that must be met prior to activating the connector
+ */
+void qdpn_connector_activate(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
+
+/** Return the activation status of the connector for a criteria
+ *
+ * Return the activation status (i.e. readable, writable) for the connector. This function
+ * has the side-effect of canceling the activation of the criteria.
+ *
+ * Please note that this function must not be used for normal AMQP connectors. It is only
+ * used for connectors created so the driver can track non-AMQP file descriptors. Such
+ * connectors are never passed into qdpn_connector_process.
+ *
+ * @param[in] connector The connector object to activate
+ * @param[in] criteria The criteria to test. "Is this the reason the connector appeared
+ * in the work list?"
+ * @return true iff the criteria is activated on the connector.
+ */
+bool qdpn_connector_activated(qdpn_connector_t *connector, qdpn_activate_criteria_t criteria);
+
+/** Create a listener using the existing file descriptor.
+ *
+ * @param[in] driver driver that will 'own' this listener
+ * @param[in] fd existing socket for listener to listen on
+ * @param[in] context application-supplied, can be accessed via
+ * qdpn_listener_context()
+ * @return a new listener on the given host:port, NULL if error
+ */
+qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
+
+pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener);
+
+/** Create a connector using the existing file descriptor.
+ *
+ * @param[in] driver driver that will 'own' this connector.
+ * @param[in] fd existing socket to use for this connector.
+ * @param[in] context application-supplied, can be accessed via
+ * qdpn_connector_context()
+ * @return a new connector to the given host:port, NULL if error.
+ */
+qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, pn_socket_t fd, void *context);
+
+pn_socket_t qdpn_connector_get_fd(qdpn_connector_t *connector);
+
+
+#endif /* driver.h */
Propchange: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/dispatch/trunk/include/qpid/dispatch/driver.h
------------------------------------------------------------------------------
svn:keywords = Author Date Id Rev URL
Added: qpid/dispatch/trunk/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/posix/driver.c?rev=1643428&view=auto
==============================================================================
--- qpid/dispatch/trunk/src/posix/driver.c (added)
+++ qpid/dispatch/trunk/src/posix/driver.c Fri Dec 5 20:59:48 2014
@@ -0,0 +1,857 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <assert.h>
+#include <poll.h>
+#include <stdio.h>
+#include <ctype.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+
+#include <qpid/dispatch/driver.h>
+//#include <proton/driver_extras.h>
+#include <proton/error.h>
+#include <proton/io.h>
+#include <proton/sasl.h>
+#include <proton/ssl.h>
+#include <proton/object.h>
+#include <qpid/dispatch/ctools.h>
+//#include "util.h"
+//#include "platform.h"
+
+/* Decls */
+
+#define PN_SEL_RD (0x0001)
+#define PN_SEL_WR (0x0002)
+
+DEQ_DECLARE(qdpn_listener_t, qdpn_listener_list_t);
+DEQ_DECLARE(qdpn_connector_t, qdpn_connector_list_t);
+
+struct qdpn_driver_t {
+ pn_error_t *error;
+ pn_io_t *io;
+ qdpn_listener_list_t listeners;
+ qdpn_connector_list_t connectors;
+ qdpn_listener_t *listener_next;
+ qdpn_connector_t *connector_next;
+ size_t closed_count;
+ size_t capacity;
+ struct pollfd *fds;
+ size_t nfds;
+ int ctrl[2]; //pipe for updating selectable status
+ pn_timestamp_t wakeup;
+ pn_trace_t trace;
+};
+
+struct qdpn_listener_t {
+ DEQ_LINKS(qdpn_listener_t);
+ qdpn_driver_t *driver;
+ void *context;
+ int idx;
+ int fd;
+ bool pending;
+ bool closed;
+};
+
+#define PN_NAME_MAX (256)
+
+struct qdpn_connector_t {
+ DEQ_LINKS(qdpn_connector_t);
+ qdpn_driver_t *driver;
+ char name[PN_NAME_MAX];
+ pn_timestamp_t wakeup;
+ pn_connection_t *connection;
+ pn_transport_t *transport;
+ pn_sasl_t *sasl;
+ qdpn_listener_t *listener;
+ void *context;
+ int idx;
+ int fd;
+ int status;
+ pn_trace_t trace;
+ bool pending_tick;
+ bool pending_read;
+ bool pending_write;
+ bool closed;
+ bool input_done;
+ bool output_done;
+};
+
+/* Impls */
+
+static void pni_fatal(const char *text)
+{
+ fprintf(stderr, "%s\n", text);
+ exit(1);
+}
+
+#ifdef USE_CLOCK_GETTIME
+#include <time.h>
+pn_timestamp_t pn_i_now(void)
+{
+ struct timespec now;
+ if (clock_gettime(CLOCK_REALTIME, &now)) pni_fatal("clock_gettime() failed");
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
+}
+#elif defined(USE_WIN_FILETIME)
+#include <windows.h>
+pn_timestamp_t pn_i_now(void)
+{
+ FILETIME now;
+ GetSystemTimeAsFileTime(&now);
+ ULARGE_INTEGER t;
+ t.u.HighPart = now.dwHighDateTime;
+ t.u.LowPart = now.dwLowDateTime;
+ // Convert to milliseconds and adjust base epoch
+ return t.QuadPart / 10000 - 11644473600000;
+}
+#else
+#include <sys/time.h>
+pn_timestamp_t pn_i_now(void)
+{
+ struct timeval now;
+ if (gettimeofday(&now, NULL)) pni_fatal("gettimeofday failed");
+ return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_usec / 1000);
+}
+#endif
+
+static bool pni_eq_nocase(const char *a, const char *b)
+{
+ while (*b) {
+ if (tolower(*a++) != tolower(*b++))
+ return false;
+ }
+ return !(*a);
+}
+
+static bool pn_env_bool(const char *name)
+{
+ char *v = getenv(name);
+ return v && (pni_eq_nocase(v, "true") || pni_eq_nocase(v, "1") ||
+ pni_eq_nocase(v, "yes") || pni_eq_nocase(v, "on"));
+}
+
+#define pn_min(X,Y) ((X) > (Y) ? (Y) : (X))
+#define pn_max(X,Y) ((X) < (Y) ? (Y) : (X))
+
+static pn_timestamp_t pn_timestamp_min( pn_timestamp_t a, pn_timestamp_t b )
+{
+ if (a && b) return pn_min(a, b);
+ if (a) return a;
+ return b;
+}
+
+static int pn_i_error_from_errno(pn_error_t *error, const char *msg)
+{
+ char err[1024];
+ strerror_r(errno, err, 1024);
+ int code = PN_ERR;
+ if (errno == EINTR)
+ code = PN_INTR;
+ return pn_error_format(error, code, "%s: %s", msg, err);
+}
+
+
+// listener
+
+static void qdpn_driver_add_listener(qdpn_driver_t *d, qdpn_listener_t *l)
+{
+ if (!l->driver) return;
+ DEQ_INSERT_TAIL(d->listeners, l);
+ l->driver = d;
+}
+
+static void qdpn_driver_remove_listener(qdpn_driver_t *d, qdpn_listener_t *l)
+{
+ if (!l->driver) return;
+
+ if (l == d->listener_next) {
+ d->listener_next = DEQ_NEXT(l);
+ }
+
+ DEQ_REMOVE(d->listeners, l);
+ l->driver = NULL;
+}
+
+qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, const char *host,
+ const char *port, void* context)
+{
+ if (!driver) return NULL;
+
+ pn_socket_t sock = pn_listen(driver->io, host, port);
+ if (sock == PN_INVALID_SOCKET) {
+ return NULL;
+ } else {
+ qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context);
+
+ if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ fprintf(stderr, "Listening on %s:%s\n", host, port);
+
+ return l;
+ }
+}
+
+qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, int fd, void *context)
+{
+ if (!driver) return NULL;
+
+ qdpn_listener_t *l = (qdpn_listener_t *) malloc(sizeof(qdpn_listener_t));
+ if (!l) return NULL;
+ DEQ_ITEM_INIT(l);
+ l->driver = driver;
+ l->idx = 0;
+ l->pending = false;
+ l->fd = fd;
+ l->closed = false;
+ l->context = context;
+
+ qdpn_driver_add_listener(driver, l);
+ return l;
+}
+
+pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener)
+{
+ assert(listener);
+ return listener->fd;
+}
+
+qdpn_listener_t *qdpn_listener_head(qdpn_driver_t *driver)
+{
+ return driver ? DEQ_HEAD(driver->listeners) : NULL;
+}
+
+qdpn_listener_t *qdpn_listener_next(qdpn_listener_t *listener)
+{
+ return listener ? DEQ_NEXT(listener) : NULL;
+}
+
+void qdpn_listener_trace(qdpn_listener_t *l, pn_trace_t trace)
+{
+ // XXX
+}
+
+void *qdpn_listener_context(qdpn_listener_t *l)
+{
+ return l ? l->context : NULL;
+}
+
+void qdpn_listener_set_context(qdpn_listener_t *listener, void *context)
+{
+ assert(listener);
+ listener->context = context;
+}
+
+qdpn_connector_t *qdpn_listener_accept(qdpn_listener_t *l)
+{
+ if (!l || !l->pending) return NULL;
+ char name[PN_NAME_MAX];
+
+ pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX);
+ if (sock == PN_INVALID_SOCKET) {
+ return NULL;
+ } else {
+ if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ fprintf(stderr, "Accepted from %s\n", name);
+ qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
+ snprintf(c->name, PN_NAME_MAX, "%s", name);
+ c->listener = l;
+ return c;
+ }
+}
+
+void qdpn_listener_close(qdpn_listener_t *l)
+{
+ if (!l) return;
+ if (l->closed) return;
+
+ if (close(l->fd) == -1)
+ perror("close");
+ l->closed = true;
+}
+
+void qdpn_listener_free(qdpn_listener_t *l)
+{
+ if (!l) return;
+
+ if (l->driver) qdpn_driver_remove_listener(l->driver, l);
+ free(l);
+}
+
+// connector
+
+static void qdpn_driver_add_connector(qdpn_driver_t *d, qdpn_connector_t *c)
+{
+ if (!c->driver) return;
+ DEQ_INSERT_TAIL(d->connectors, c);
+ c->driver = d;
+}
+
+static void qdpn_driver_remove_connector(qdpn_driver_t *d, qdpn_connector_t *c)
+{
+ if (!c->driver) return;
+
+ if (c == d->connector_next) {
+ d->connector_next = DEQ_NEXT(c);
+ }
+
+ DEQ_REMOVE(d->connectors, c);
+ c->driver = NULL;
+ if (c->closed) {
+ d->closed_count--;
+ }
+}
+
+qdpn_connector_t *qdpn_connector(qdpn_driver_t *driver, const char *host,
+ const char *port, void *context)
+{
+ if (!driver) return NULL;
+
+ pn_socket_t sock = pn_connect(driver->io, host, port);
+
+ qdpn_connector_t *c = qdpn_connector_fd(driver, sock, context);
+ snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
+ if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ fprintf(stderr, "Connected to %s\n", c->name);
+ return c;
+}
+
+qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context)
+{
+ if (!driver) return NULL;
+
+ qdpn_connector_t *c = (qdpn_connector_t *) malloc(sizeof(qdpn_connector_t));
+ if (!c) return NULL;
+ DEQ_ITEM_INIT(c);
+ c->driver = driver;
+ c->pending_tick = false;
+ c->pending_read = false;
+ c->pending_write = false;
+ c->name[0] = '\0';
+ c->idx = 0;
+ c->fd = fd;
+ c->status = PN_SEL_RD | PN_SEL_WR;
+ c->trace = driver->trace;
+ c->closed = false;
+ c->wakeup = 0;
+ c->connection = NULL;
+ c->transport = pn_transport();
+ c->sasl = pn_sasl(c->transport);
+ c->input_done = false;
+ c->output_done = false;
+ c->context = context;
+ c->listener = NULL;
+
+ qdpn_connector_trace(c, driver->trace);
+
+ qdpn_driver_add_connector(driver, c);
+ return c;
+}
+
+pn_socket_t qdpn_connector_get_fd(qdpn_connector_t *connector)
+{
+ assert(connector);
+ return connector->fd;
+}
+
+qdpn_connector_t *qdpn_connector_head(qdpn_driver_t *driver)
+{
+ return driver ? DEQ_HEAD(driver->connectors) : NULL;
+}
+
+qdpn_connector_t *qdpn_connector_next(qdpn_connector_t *connector)
+{
+ return connector ? DEQ_NEXT(connector) : NULL;
+}
+
+void qdpn_connector_trace(qdpn_connector_t *ctor, pn_trace_t trace)
+{
+ if (!ctor) return;
+ ctor->trace = trace;
+ if (ctor->transport) pn_transport_trace(ctor->transport, trace);
+}
+
+pn_sasl_t *qdpn_connector_sasl(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->sasl : NULL;
+}
+
+pn_transport_t *qdpn_connector_transport(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->transport : NULL;
+}
+
+void qdpn_connector_set_connection(qdpn_connector_t *ctor, pn_connection_t *connection)
+{
+ if (!ctor) return;
+ if (ctor->connection) {
+ pn_class_decref(PN_OBJECT, ctor->connection);
+ pn_transport_unbind(ctor->transport);
+ }
+ ctor->connection = connection;
+ if (ctor->connection) {
+ pn_class_incref(PN_OBJECT, ctor->connection);
+ pn_transport_bind(ctor->transport, connection);
+ }
+ if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace);
+}
+
+pn_connection_t *qdpn_connector_connection(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->connection : NULL;
+}
+
+void *qdpn_connector_context(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->context : NULL;
+}
+
+void qdpn_connector_set_context(qdpn_connector_t *ctor, void *context)
+{
+ if (!ctor) return;
+ ctor->context = context;
+}
+
+const char *qdpn_connector_name(const qdpn_connector_t *ctor)
+{
+ if (!ctor) return 0;
+ return ctor->name;
+}
+
+qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->listener : NULL;
+}
+
+void qdpn_connector_close(qdpn_connector_t *ctor)
+{
+ // XXX: should probably signal engine and callback here
+ if (!ctor) return;
+
+ ctor->status = 0;
+ if (close(ctor->fd) == -1)
+ perror("close");
+ ctor->closed = true;
+ ctor->driver->closed_count++;
+}
+
+bool qdpn_connector_closed(qdpn_connector_t *ctor)
+{
+ return ctor ? ctor->closed : true;
+}
+
+void qdpn_connector_free(qdpn_connector_t *ctor)
+{
+ if (!ctor) return;
+
+ if (ctor->driver) qdpn_driver_remove_connector(ctor->driver, ctor);
+ pn_transport_free(ctor->transport);
+ ctor->transport = NULL;
+ if (ctor->connection) pn_class_decref(PN_OBJECT, ctor->connection);
+ ctor->connection = NULL;
+ free(ctor);
+}
+
+void qdpn_connector_activate(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
+{
+ switch (crit) {
+ case QDPN_CONNECTOR_WRITABLE :
+ ctor->status |= PN_SEL_WR;
+ break;
+
+ case QDPN_CONNECTOR_READABLE :
+ ctor->status |= PN_SEL_RD;
+ break;
+ }
+}
+
+
+bool qdpn_connector_activated(qdpn_connector_t *ctor, qdpn_activate_criteria_t crit)
+{
+ bool result = false;
+
+ switch (crit) {
+ case QDPN_CONNECTOR_WRITABLE :
+ result = ctor->pending_write;
+ ctor->pending_write = false;
+ ctor->status &= ~PN_SEL_WR;
+ break;
+
+ case QDPN_CONNECTOR_READABLE :
+ result = ctor->pending_read;
+ ctor->pending_read = false;
+ ctor->status &= ~PN_SEL_RD;
+ break;
+ }
+
+ return result;
+}
+
+static pn_timestamp_t qdpn_connector_tick(qdpn_connector_t *ctor, pn_timestamp_t now)
+{
+ if (!ctor->transport) return 0;
+ return pn_transport_tick(ctor->transport, now);
+}
+
+void qdpn_connector_process(qdpn_connector_t *c)
+{
+ if (c) {
+ if (c->closed) return;
+
+ pn_transport_t *transport = c->transport;
+
+ ///
+ /// Socket read
+ ///
+ if (!c->input_done) {
+ ssize_t capacity = pn_transport_capacity(transport);
+ if (capacity > 0) {
+ c->status |= PN_SEL_RD;
+ if (c->pending_read) {
+ c->pending_read = false;
+ ssize_t n = pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity);
+ if (n < 0) {
+ if (errno != EAGAIN) {
+ perror("read");
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ pn_transport_close_tail( transport );
+ }
+ } else if (n == 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ pn_transport_close_tail( transport );
+ } else {
+ if (pn_transport_process(transport, (size_t) n) < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+ }
+ }
+
+ capacity = pn_transport_capacity(transport);
+
+ if (capacity < 0) {
+ c->status &= ~PN_SEL_RD;
+ c->input_done = true;
+ }
+ }
+
+ ///
+ /// Event wakeup
+ ///
+ c->wakeup = qdpn_connector_tick(c, pn_i_now());
+
+ ///
+ /// Socket write
+ ///
+ if (!c->output_done) {
+ ssize_t pending = pn_transport_pending(transport);
+ if (pending > 0) {
+ c->status |= PN_SEL_WR;
+ if (c->pending_write) {
+ c->pending_write = false;
+ ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending);
+ if (n < 0) {
+ // XXX
+ if (errno != EAGAIN) {
+ perror("send");
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ pn_transport_close_head( transport );
+ }
+ } else if (n) {
+ pn_transport_pop(transport, (size_t) n);
+ }
+ }
+ } else if (pending == 0) {
+ c->status &= ~PN_SEL_WR;
+ } else {
+ c->output_done = true;
+ c->status &= ~PN_SEL_WR;
+ }
+ }
+
+ // Closed?
+
+ if (c->input_done && c->output_done) {
+ if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+ fprintf(stderr, "Closed %s\n", c->name);
+ }
+ qdpn_connector_close(c);
+ }
+ }
+}
+
+// driver
+
+qdpn_driver_t *qdpn_driver()
+{
+ qdpn_driver_t *d = (qdpn_driver_t *) malloc(sizeof(qdpn_driver_t));
+ if (!d) return NULL;
+ DEQ_INIT(d->listeners);
+ DEQ_INIT(d->connectors);
+ d->error = pn_error();
+ d->io = pn_io();
+ d->listener_next = NULL;
+ d->connector_next = NULL;
+ d->closed_count = 0;
+ d->capacity = 0;
+ d->fds = NULL;
+ d->nfds = 0;
+ d->ctrl[0] = 0;
+ d->ctrl[1] = 0;
+ d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
+ (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+ (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
+ d->wakeup = 0;
+
+ // XXX
+ if (pipe(d->ctrl)) {
+ perror("Can't create control pipe");
+ }
+
+ return d;
+}
+
+int qdpn_driver_errno(qdpn_driver_t *d)
+{
+ assert(d);
+ return pn_error_code(d->error);
+}
+
+pn_error_t *qdpn_driver_error(qdpn_driver_t *d)
+{
+ assert(d);
+ return d->error;
+}
+
+void qdpn_driver_trace(qdpn_driver_t *d, pn_trace_t trace)
+{
+ d->trace = trace;
+}
+
+void qdpn_driver_free(qdpn_driver_t *d)
+{
+ if (!d) return;
+
+ close(d->ctrl[0]);
+ close(d->ctrl[1]);
+ while (DEQ_HEAD(d->connectors))
+ qdpn_connector_free(DEQ_HEAD(d->connectors));
+ while (DEQ_HEAD(d->listeners))
+ qdpn_listener_free(DEQ_HEAD(d->listeners));
+ free(d->fds);
+ pn_error_free(d->error);
+ pn_io_free(d->io);
+ free(d);
+}
+
+int qdpn_driver_wakeup(qdpn_driver_t *d)
+{
+ if (d) {
+ ssize_t count = write(d->ctrl[1], "x", 1);
+ if (count <= 0) {
+ return count;
+ } else {
+ return 0;
+ }
+ } else {
+ return PN_ARG_ERR;
+ }
+}
+
+static void qdpn_driver_rebuild(qdpn_driver_t *d)
+{
+ size_t size = DEQ_SIZE(d->listeners) + DEQ_SIZE(d->connectors);
+ while (d->capacity < size + 1) {
+ d->capacity = d->capacity ? 2*d->capacity : 16;
+ d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd));
+ }
+
+ d->wakeup = 0;
+ d->nfds = 0;
+
+ d->fds[d->nfds].fd = d->ctrl[0];
+ d->fds[d->nfds].events = POLLIN;
+ d->fds[d->nfds].revents = 0;
+ d->nfds++;
+
+ qdpn_listener_t *l = DEQ_HEAD(d->listeners);
+ while (l) {
+ d->fds[d->nfds].fd = l->fd;
+ d->fds[d->nfds].events = POLLIN;
+ d->fds[d->nfds].revents = 0;
+ l->idx = d->nfds;
+ d->nfds++;
+ l = DEQ_NEXT(l);
+ }
+
+ qdpn_connector_t *c = DEQ_HEAD(d->connectors);
+ while (c) {
+ if (!c->closed) {
+ d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup);
+ d->fds[d->nfds].fd = c->fd;
+ d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0);
+ d->fds[d->nfds].revents = 0;
+ c->idx = d->nfds;
+ d->nfds++;
+ }
+ c = DEQ_NEXT(c);
+ }
+}
+
+void qdpn_driver_wait_1(qdpn_driver_t *d)
+{
+ qdpn_driver_rebuild(d);
+}
+
+int qdpn_driver_wait_2(qdpn_driver_t *d, int timeout)
+{
+ if (d->wakeup) {
+ pn_timestamp_t now = pn_i_now();
+ if (now >= d->wakeup)
+ timeout = 0;
+ else
+ timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now);
+ }
+ int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout);
+ if (result == -1)
+ pn_i_error_from_errno(d->error, "poll");
+ return result;
+}
+
+int qdpn_driver_wait_3(qdpn_driver_t *d)
+{
+ bool woken = false;
+ if (d->fds[0].revents & POLLIN) {
+ woken = true;
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ qdpn_listener_t *l = DEQ_HEAD(d->listeners);
+ while (l) {
+ l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
+ l = DEQ_NEXT(l);
+ }
+
+ pn_timestamp_t now = pn_i_now();
+ qdpn_connector_t *c = DEQ_HEAD(d->connectors);
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ int idx = c->idx;
+ c->pending_read = (idx && d->fds[idx].revents & POLLIN);
+ c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+ c->pending_tick = (c->wakeup && c->wakeup <= now);
+ if (idx && d->fds[idx].revents & POLLERR)
+ qdpn_connector_close(c);
+ else if (idx && (d->fds[idx].revents & POLLHUP)) {
+ if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+ fprintf(stderr, "hangup on connector %s\n", c->name);
+ }
+ /* poll() is signalling POLLHUP. to see what happened we need
+ * to do an actual recv() to get the error code. But we might
+ * be in a state where we're not interested in input, in that
+ * case try to get the error code via send() */
+ if (d->fds[idx].events & POLLIN)
+ c->pending_read = true;
+ else if (d->fds[idx].events & POLLOUT)
+ c->pending_write = true;
+ } else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
+ if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+ fprintf(stderr, "Unexpected poll events: %04x on %s\n",
+ d->fds[idx].revents, c->name);
+ }
+ }
+ }
+ c = DEQ_NEXT(c);
+ }
+
+ d->listener_next = DEQ_HEAD(d->listeners);
+ d->connector_next = DEQ_HEAD(d->connectors);
+
+ return woken ? PN_INTR : 0;
+}
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+// temporary workaround for a multi-threading problem. A multi-threaded
+// application must hold a lock on parts 1 and 3, but not on part 2.
+// This temporary change, which is not reflected in the driver's API, allows
+// a multi-threaded application to use the three parts separately.
+//
+// This workaround will eventually be replaced by a more elegant solution
+// to the problem.
+//
+int qdpn_driver_wait(qdpn_driver_t *d, int timeout)
+{
+ qdpn_driver_wait_1(d);
+ int result = qdpn_driver_wait_2(d, timeout);
+ if (result == -1)
+ return pn_error_code(d->error);
+ return qdpn_driver_wait_3(d);
+}
+
+qdpn_listener_t *qdpn_driver_listener(qdpn_driver_t *d)
+{
+ if (!d) return NULL;
+
+ while (d->listener_next) {
+ qdpn_listener_t *l = d->listener_next;
+ d->listener_next = DEQ_NEXT(l);
+
+ if (l->pending) {
+ return l;
+ }
+ }
+
+ return NULL;
+}
+
+qdpn_connector_t *qdpn_driver_connector(qdpn_driver_t *d)
+{
+ if (!d) return NULL;
+
+ while (d->connector_next) {
+ qdpn_connector_t *c = d->connector_next;
+ d->connector_next = DEQ_NEXT(c);
+
+ if (c->closed || c->pending_read || c->pending_write || c->pending_tick) {
+ return c;
+ }
+ }
+
+ return NULL;
+}
+
Propchange: qpid/dispatch/trunk/src/posix/driver.c
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org