You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/03/02 14:56:55 UTC

svn commit: r1663308 - in /qpid/dispatch/trunk/src: posix/driver.c server.c

Author: tross
Date: Mon Mar  2 13:56:55 2015
New Revision: 1663308

URL: http://svn.apache.org/r1663308
Log:
DISPATCH-118 - Worked around a newly introduced thread-safety problem in Proton.

This commit removes the use of pn_io_t in the dispatch driver.  The functionality needed
in dispatch was forked into the dispatch driver.  This may be undone in the future when
the thread problems are resolved and when/if someone desires to run dispatch on Windows.


Modified:
    qpid/dispatch/trunk/src/posix/driver.c
    qpid/dispatch/trunk/src/server.c

Modified: qpid/dispatch/trunk/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/posix/driver.c?rev=1663308&r1=1663307&r2=1663308&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/posix/driver.c (original)
+++ qpid/dispatch/trunk/src/posix/driver.c Mon Mar  2 13:56:55 2015
@@ -22,7 +22,11 @@
 #include <assert.h>
 #include <poll.h>
 #include <stdio.h>
+#include <string.h>
+
 #include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
@@ -30,23 +34,23 @@
 #include <netdb.h>
 #include <unistd.h>
 #include <fcntl.h>
-#include <errno.h>
-#include <string.h>
+#include <assert.h>
 
 #include <qpid/dispatch/driver.h>
 #include <qpid/dispatch/alloc.h>
-//#include <proton/driver_extras.h>
 #include <proton/error.h>
-#include <proton/io.h>
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 #include <proton/object.h>
 #include <qpid/dispatch/ctools.h>
-//#include "util.h"
-//#include "platform.h"
+#include <qpid/dispatch/log.h>
 
 /* Decls */
 
+#define MAX_HOST  1024
+#define MAX_SERV  256
+#define ERROR_MAX 128
+
 #define PN_SEL_RD (0x0001)
 #define PN_SEL_WR (0x0002)
 
@@ -54,19 +58,18 @@ DEQ_DECLARE(qdpn_listener_t, qdpn_listen
 DEQ_DECLARE(qdpn_connector_t, qdpn_connector_list_t);
 
 struct qdpn_driver_t {
-    pn_error_t *error;
-    pn_io_t *io;
-    qdpn_listener_list_t listeners;
-    qdpn_connector_list_t connectors;
-    qdpn_listener_t *listener_next;
-    qdpn_connector_t *connector_next;
-    size_t closed_count;
-    size_t capacity;
-    struct pollfd *fds;
-    size_t nfds;
-    int ctrl[2]; //pipe for updating selectable status
-    pn_timestamp_t wakeup;
-    pn_trace_t trace;
+    qd_log_source_t       *log;
+    qdpn_listener_list_t   listeners;
+    qdpn_connector_list_t  connectors;
+    qdpn_listener_t       *listener_next;
+    qdpn_connector_t      *connector_next;
+    size_t                 closed_count;
+    size_t                 capacity;
+    struct pollfd         *fds;
+    size_t                 nfds;
+    int                    ctrl[2]; //pipe for updating selectable status
+    pn_timestamp_t         wakeup;
+    pn_trace_t             trace;
 };
 
 struct qdpn_listener_t {
@@ -174,14 +177,11 @@ static pn_timestamp_t pn_timestamp_min(
     return b;
 }
 
-static int pn_i_error_from_errno(pn_error_t *error, const char *msg)
+static void qdpn_log_errno(qdpn_driver_t *d, const char *msg)
 {
-  char err[1024];
-  strerror_r(errno, err, 1024);
-  int code = PN_ERR;
-  if (errno == EINTR)
-      code = PN_INTR;
-  return pn_error_format(error, code, "%s: %s", msg, err);
+    char ebuf[ERROR_MAX];
+    strerror_r(errno, ebuf, ERROR_MAX);
+    qd_log(d->log, QD_LOG_ERROR, "%s: %s", msg, ebuf);
 }
 
 
@@ -206,22 +206,86 @@ static void qdpn_driver_remove_listener(
     l->driver = NULL;
 }
 
+
+static int qdpn_create_socket(int af)
+{
+    struct protoent *pe_tcp = getprotobyname("tcp");
+    if (pe_tcp == NULL)
+        return -1;
+    return socket(af, SOCK_STREAM, pe_tcp->p_proto);
+}
+
+
+static void qdpn_configure_sock(qdpn_driver_t *driver, int sock)
+{
+    //
+    // Set the socket to be non-blocking for asynchronous operation.
+    //
+    int flags = fcntl(sock, F_GETFL);
+    flags |= O_NONBLOCK;
+    if (fcntl(sock, F_SETFL, flags) < 0)
+        qdpn_log_errno(driver, "fcntl");
+
+    //
+    // Disable the Nagle algorithm on TCP connections.
+    //
+    // Note:  It would be more correct for the "level" argument to be SOL_TCP.  However, there
+    //        are portability issues with this macro so we use IPPROTO_TCP instead.
+    //
+    int tcp_nodelay = 1;
+    if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0)
+        qdpn_log_errno(driver, "setsockopt");
+}
+
+
 qdpn_listener_t *qdpn_listener(qdpn_driver_t *driver, const char *host,
                                const char *port, void* context)
 {
     if (!driver) return NULL;
 
-    pn_socket_t sock = pn_listen(driver->io, host, port);
-    if (sock == PN_INVALID_SOCKET) {
-        return NULL;
-    } else {
-        qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context);
+    struct addrinfo *addr;
+    int code = getaddrinfo(host, port, NULL, &addr);
+    if (code) {
+        qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+        return 0;
+    }
 
-        if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
-            fprintf(stderr, "Listening on %s:%s\n", host, port);
+    int sock = qdpn_create_socket(addr->ai_family);
+    if (sock < 0) {
+        qdpn_log_errno(driver, "pn_create_socket");
+        freeaddrinfo(addr);
+        return 0;
+    }
+
+    int optval = 1;
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
+        qdpn_log_errno(driver, "setsockopt");
+        close(sock);
+        freeaddrinfo(addr);
+        return 0;
+    }
 
-        return l;
+    if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+        qdpn_log_errno(driver, "bind");
+        freeaddrinfo(addr);
+        close(sock);
+        return 0;
     }
+
+    freeaddrinfo(addr);
+
+    if (listen(sock, 50) == -1) {
+        qdpn_log_errno(driver, "listen");
+        close(sock);
+        return 0;
+    }
+
+    qdpn_listener_t *l = qdpn_listener_fd(driver, sock, context);
+
+    if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+        fprintf(stderr, "Listening on %s:%s\n", host, port);
+
+    return l;
 }
 
 qdpn_listener_t *qdpn_listener_fd(qdpn_driver_t *driver, int fd, void *context)
@@ -242,7 +306,7 @@ qdpn_listener_t *qdpn_listener_fd(qdpn_d
     return l;
 }
 
-pn_socket_t qdpn_listener_get_fd(qdpn_listener_t *listener)
+int qdpn_listener_get_fd(qdpn_listener_t *listener)
 {
     assert(listener);
     return listener->fd;
@@ -278,18 +342,36 @@ qdpn_connector_t *qdpn_listener_accept(q
 {
     if (!l || !l->pending) return NULL;
     char name[PN_NAME_MAX];
+    char host[MAX_HOST];
+    char serv[MAX_SERV];
 
-    pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX);
-    if (sock == PN_INVALID_SOCKET) {
-        return NULL;
+    struct sockaddr_in addr = {0};
+    addr.sin_family = AF_UNSPEC;
+    socklen_t addrlen = sizeof(addr);
+
+    int sock = accept(l->fd, (struct sockaddr *) &addr, &addrlen);
+    if (sock < 0) {
+        qdpn_log_errno(l->driver, "accept");
+        return 0;
     } else {
-        if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
-            fprintf(stderr, "Accepted from %s\n", name);
-        qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
-        snprintf(c->name, PN_NAME_MAX, "%s", name);
-        c->listener = l;
-        return c;
+        int code;
+        if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, MAX_HOST, serv, MAX_SERV, 0))) {
+            qd_log(l->driver->log, QD_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(code));
+            close(sock);
+            return 0;
+        } else {
+            qdpn_configure_sock(l->driver, sock);
+            snprintf(name, PN_NAME_MAX-1, "%s:%s", host, serv);
+        }
     }
+
+    if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+        fprintf(stderr, "Accepted from %s\n", name);
+
+    qdpn_connector_t *c = qdpn_connector_fd(l->driver, sock, NULL);
+    snprintf(c->name, PN_NAME_MAX, "%s", name);
+    c->listener = l;
+    return c;
 }
 
 void qdpn_listener_close(qdpn_listener_t *l)
@@ -339,7 +421,31 @@ qdpn_connector_t *qdpn_connector(qdpn_dr
 {
     if (!driver) return NULL;
 
-    pn_socket_t sock = pn_connect(driver->io, host, port);
+    struct addrinfo *addr;
+    int code = getaddrinfo(host, port, NULL, &addr);
+    if (code) {
+        qd_log(driver->log, QD_LOG_ERROR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+        return 0;
+    }
+
+    int sock = qdpn_create_socket(addr->ai_family);
+    if (sock == PN_INVALID_SOCKET) {
+        qdpn_log_errno(driver, "pn_create_socket");
+        return 0;
+    }
+
+    qdpn_configure_sock(driver, sock);
+
+    if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+        if (errno != EINPROGRESS) {
+            qdpn_log_errno(driver, "connect");
+            freeaddrinfo(addr);
+            close(sock);
+            return 0;
+        }
+    }
+
+    freeaddrinfo(addr);
 
     qdpn_connector_t *c = qdpn_connector_fd(driver, sock, context);
     snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
@@ -381,7 +487,7 @@ qdpn_connector_t *qdpn_connector_fd(qdpn
     return c;
 }
 
-pn_socket_t qdpn_connector_get_fd(qdpn_connector_t *connector)
+int qdpn_connector_get_fd(qdpn_connector_t *connector)
 {
     assert(connector);
     return connector->fd;
@@ -549,7 +655,7 @@ void qdpn_connector_process(qdpn_connect
                 c->status |= PN_SEL_RD;
                 if (c->pending_read) {
                     c->pending_read = false;
-                    ssize_t n =  pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity);
+                    ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 0);
                     if (n < 0) {
                         if (errno != EAGAIN) {
                             perror("read");
@@ -592,7 +698,7 @@ void qdpn_connector_process(qdpn_connect
                 c->status |= PN_SEL_WR;
                 if (c->pending_write) {
                     c->pending_write = false;
-                    ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending);
+                    ssize_t n = send(c->fd, pn_transport_head(transport), pending, MSG_NOSIGNAL);
                     if (n < 0) {
                         // XXX
                         if (errno != EAGAIN) {
@@ -632,8 +738,7 @@ qdpn_driver_t *qdpn_driver()
     if (!d) return NULL;
     DEQ_INIT(d->listeners);
     DEQ_INIT(d->connectors);
-    d->error = pn_error();
-    d->io = pn_io();
+    d->log = qd_log_source("DRIVER");
     d->listener_next = NULL;
     d->connector_next = NULL;
     d->closed_count = 0;
@@ -655,18 +760,6 @@ qdpn_driver_t *qdpn_driver()
     return d;
 }
 
-int qdpn_driver_errno(qdpn_driver_t *d)
-{
-    assert(d);
-    return pn_error_code(d->error);
-}
-
-pn_error_t *qdpn_driver_error(qdpn_driver_t *d)
-{
-    assert(d);
-    return d->error;
-}
-
 void qdpn_driver_trace(qdpn_driver_t *d, pn_trace_t trace)
 {
     d->trace = trace;
@@ -683,8 +776,6 @@ void qdpn_driver_free(qdpn_driver_t *d)
     while (DEQ_HEAD(d->listeners))
         qdpn_listener_free(DEQ_HEAD(d->listeners));
     free(d->fds);
-    pn_error_free(d->error);
-    pn_io_free(d->io);
     free(d);
 }
 
@@ -757,8 +848,8 @@ int qdpn_driver_wait_2(qdpn_driver_t *d,
             timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now);
     }
     int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout);
-    if (result == -1)
-        pn_i_error_from_errno(d->error, "poll");
+    if (result == -1 && errno != EINTR)
+        qdpn_log_errno(d, "poll");
     return result;
 }
 
@@ -835,7 +926,7 @@ int qdpn_driver_wait(qdpn_driver_t *d, i
     qdpn_driver_wait_1(d);
     int result = qdpn_driver_wait_2(d, timeout);
     if (result == -1)
-        return pn_error_code(d->error);
+        return errno;
     return qdpn_driver_wait_3(d);
 }
 

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1663308&r1=1663307&r2=1663308&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Mon Mar  2 13:56:55 2015
@@ -31,6 +31,7 @@
 #include <stdio.h>
 #include <time.h>
 #include <string.h>
+#include <errno.h>
 
 static __thread qd_server_t *thread_server = 0;
 
@@ -410,14 +411,14 @@ void qdpn_driver_wait_3(qdpn_driver_t *d
 
 static void *thread_run(void *arg)
 {
-    qd_thread_t     *thread    = (qd_thread_t*) arg;
-    qd_server_t     *qd_server = thread->qd_server;
-    qd_work_item_t  *work;
-    qdpn_connector_t  *cxtr;
-    pn_connection_t *conn;
-    qd_connection_t *ctx;
-    int              error;
-    int              poll_result;
+    qd_thread_t      *thread    = (qd_thread_t*) arg;
+    qd_server_t      *qd_server = thread->qd_server;
+    qd_work_item_t   *work;
+    qdpn_connector_t *cxtr;
+    pn_connection_t  *conn;
+    qd_connection_t  *ctx;
+    int               error;
+    int               poll_result;
 
     if (!thread)
         return 0;
@@ -521,10 +522,9 @@ static void *thread_run(void *arg)
                     error = 0;
                     poll_result = qdpn_driver_wait_2(qd_server->driver, duration);
                     if (poll_result == -1)
-                        error = qdpn_driver_errno(qd_server->driver);
-                } while (error == PN_INTR);
+                        error = errno;
+                } while (error == EINTR);
                 if (error) {
-                    qd_log(qd_server->log_source, QD_LOG_ERROR, "Driver Error: %s", qdpn_driver_error(qd_server->driver));
                     exit(-1);
                 }
 
@@ -1129,8 +1129,6 @@ qd_listener_t *qd_server_listen(qd_dispa
     li->pn_listener = qdpn_listener(qd_server->driver, config->host, config->port, (void*) li);
 
     if (!li->pn_listener) {
-        qd_log(qd_server->log_source, QD_LOG_ERROR, "Driver Error %d (%s)",
-               qdpn_driver_errno(qd_server->driver), qdpn_driver_error(qd_server->driver));
         free_qd_listener_t(li);
         return 0;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org