You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apr.apache.org by yl...@apache.org on 2021/11/19 13:23:12 UTC

svn commit: r1895175 - in /apr/apr/trunk: file_io/win32/pipe.c file_io/win32/readwrite.c include/arch/win32/apr_arch_file_io.h network_io/os2/sockopt.c poll/os2/pollset.c poll/unix/pollcb.c poll/unix/pollset.c poll/unix/wakeup.c

Author: ylavic
Date: Fri Nov 19 13:23:11 2021
New Revision: 1895175

URL: http://svn.apache.org/viewvc?rev=1895175&view=rev
Log:
Follow up to r1895106: Use less expensive atomics for wakeup.

If pipe writers (wakeup) put a single byte until consume it's consumed by the
reader (drain), an atomic cas for the writers (still) and an atomic (re)set
for the reader is enough (not need to cas on the reader side).

This requires that the reader never blocks on read though (e.g. spurious return
from poll), so (re)make the read side on the pipe non-blocking (finally).

Since synchronous non-blocking read is not a thing for Windows' Readfile(), add
a ->socket flag this arch's to apr_file_t (like the existing ->pipe one) which
file_socket_pipe_create() will set to make apr_file_read/write() handle
non-blocking (nor overlapped) socket pipes with WSARecv/Send().


Modified:
    apr/apr/trunk/file_io/win32/pipe.c
    apr/apr/trunk/file_io/win32/readwrite.c
    apr/apr/trunk/include/arch/win32/apr_arch_file_io.h
    apr/apr/trunk/network_io/os2/sockopt.c
    apr/apr/trunk/poll/os2/pollset.c
    apr/apr/trunk/poll/unix/pollcb.c
    apr/apr/trunk/poll/unix/pollset.c
    apr/apr/trunk/poll/unix/wakeup.c

Modified: apr/apr/trunk/file_io/win32/pipe.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/file_io/win32/pipe.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/file_io/win32/pipe.c (original)
+++ apr/apr/trunk/file_io/win32/pipe.c Fri Nov 19 13:23:11 2021
@@ -378,32 +378,28 @@ static apr_status_t create_socket_pipe(S
             rv =  apr_get_netos_error();
             goto cleanup;
         }
-        /* Verify the connection by reading the send identification.
-         */
-        do {
-            if (nc++)
-                Sleep(1);
-            nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
-            rv = nrd == SOCKET_ERROR ? apr_get_netos_error() : APR_SUCCESS;
-        } while (APR_STATUS_IS_EAGAIN(rv));
-
-        if (nrd == sizeof(iid)) {
-            if (memcmp(uid, iid, sizeof(uid)) == 0) {
-                /* Wow, we recived what we send.
-                 * Put read side of the pipe to the blocking
-                 * mode and return.
-                 */
-                bm = 0;
-                if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
-                    rv = apr_get_netos_error();
-                    goto cleanup;
-                }
-                break;
-            }
+        /* Verify the connection by reading/waiting for the identification */
+        bm = 0;
+        if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            goto cleanup;
         }
-        else if (nrd == SOCKET_ERROR) {
+        nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
+        if (nrd == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
             goto cleanup;
         }
+        if (nrd == (int)sizeof(uid) && memcmp(iid, uid, sizeof(uid)) == 0) {
+            /* Got the right identifier, put the poll()able read side of
+             * the pipe in nonblocking mode and return.
+             */
+            bm = 1;
+            if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+                rv = apr_get_netos_error();
+                goto cleanup;
+            }
+            break;
+        }
         closesocket(*rd);
     }
     /* We don't need the listening socket any more */
@@ -412,6 +408,7 @@ static apr_status_t create_socket_pipe(S
 
 cleanup:
     /* Don't leak resources */
+    closesocket(ls);
     if (*rd != INVALID_SOCKET)
         closesocket(*rd);
     if (*wr != INVALID_SOCKET)
@@ -419,7 +416,6 @@ cleanup:
 
     *rd = INVALID_SOCKET;
     *wr = INVALID_SOCKET;
-    closesocket(ls);
     return rv;
 }
 
@@ -448,21 +444,21 @@ apr_status_t file_socket_pipe_create(apr
     (*in) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
     (*in)->pool = p;
     (*in)->fname = NULL;
-    (*in)->pipe = 1;
-    (*in)->timeout = -1;
+    (*in)->socket = 1;
+    (*in)->timeout = 0; /* read end of the pipe is non-blocking */
     (*in)->ungetchar = -1;
     (*in)->eof_hit = 0;
     (*in)->filePtr = 0;
     (*in)->bufpos = 0;
     (*in)->dataRead = 0;
     (*in)->direction = 0;
-    (*in)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+    (*in)->pOverlapped = NULL;
     (*in)->filehand = (HANDLE)rd;
 
     (*out) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
     (*out)->pool = p;
     (*out)->fname = NULL;
-    (*out)->pipe = 1;
+    (*out)->socket = 1;
     (*out)->timeout = -1;
     (*out)->ungetchar = -1;
     (*out)->eof_hit = 0;
@@ -470,7 +466,7 @@ apr_status_t file_socket_pipe_create(apr
     (*out)->bufpos = 0;
     (*out)->dataRead = 0;
     (*out)->direction = 0;
-    (*out)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+    (*out)->pOverlapped = NULL;
     (*out)->filehand = (HANDLE)wr;
 
     apr_pool_cleanup_register(p, (void *)(*in), socket_pipe_cleanup,
@@ -484,7 +480,7 @@ apr_status_t file_socket_pipe_create(apr
 apr_status_t file_socket_pipe_close(apr_file_t *file)
 {
     apr_status_t stat;
-    if (!file->pipe)
+    if (!file->socket)
         return apr_file_close(file);
     if ((stat = socket_pipe_cleanup(file)) == APR_SUCCESS) {
         apr_pool_cleanup_kill(file->pool, file, socket_pipe_cleanup);

Modified: apr/apr/trunk/file_io/win32/readwrite.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/file_io/win32/readwrite.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/file_io/win32/readwrite.c (original)
+++ apr/apr/trunk/file_io/win32/readwrite.c Fri Nov 19 13:23:11 2021
@@ -20,10 +20,12 @@
 #include "apr_strings.h"
 #include "apr_lib.h"
 #include "apr_errno.h"
-#include <malloc.h>
+#include "apr_arch_networkio.h"
 #include "apr_arch_atime.h"
 #include "apr_arch_misc.h"
 
+#include <malloc.h>
+
 /*
  * read_with_timeout() 
  * Uses async i/o to emulate unix non-blocking i/o with timeouts.
@@ -31,6 +33,7 @@
 static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t len_in, apr_size_t *nbytes)
 {
     apr_status_t rv;
+    int pipe_or_socket = (file->pipe || file->socket);
     DWORD len = (DWORD)len_in;
     DWORD bytesread = 0;
 
@@ -67,13 +70,28 @@ static apr_status_t read_with_timeout(ap
         }
     }
 
-    if (file->pOverlapped && !file->pipe) {
+    if (file->pOverlapped && !pipe_or_socket) {
         file->pOverlapped->Offset     = (DWORD)file->filePtr;
         file->pOverlapped->OffsetHigh = (DWORD)(file->filePtr >> 32);
     }
 
-    if (ReadFile(file->filehand, buf, len, 
-                 &bytesread, file->pOverlapped)) {
+    if (file->socket && !file->pOverlapped) {
+        WSABUF wsaData;
+        DWORD flags = 0;
+
+        wsaData.buf = (char*) buf;
+        wsaData.len = (u_long)len;
+        if (WSARecv((SOCKET)file->filehand, &wsaData, 1, &bytesread,
+                    &flags, NULL, NULL) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            bytesread = 0;
+        }
+        else {
+            rv = APR_SUCCESS;
+        }
+    }
+    else if (ReadFile(file->filehand, buf, len, 
+                      &bytesread, file->pOverlapped)) {
         rv = APR_SUCCESS;
     }
     else {
@@ -139,7 +157,7 @@ static apr_status_t read_with_timeout(ap
     if (rv == APR_SUCCESS && bytesread == 0)
         rv = APR_EOF;
     
-    if (rv == APR_SUCCESS && file->pOverlapped && !file->pipe) {
+    if (rv == APR_SUCCESS && file->pOverlapped && !pipe_or_socket) {
         file->filePtr += bytesread;
     }
     *nbytes = bytesread;
@@ -385,7 +403,7 @@ static apr_status_t write_buffered(apr_f
 APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, apr_size_t *nbytes)
 {
     apr_status_t rv;
-    DWORD bwrote;
+    DWORD bwrote = 0;
 
     /* If the file is open for xthread support, allocate and
      * initialize the overlapped and io completion event (hEvent). 
@@ -409,9 +427,27 @@ APR_DECLARE(apr_status_t) apr_file_write
         if (thefile->flags & APR_FOPEN_XTHREAD) {
             apr_thread_mutex_unlock(thefile->mutex);
         }
-        return rv;
-    } else {
-        if (thefile->pipe) {
+    }
+    else if (thefile->socket && !thefile->pOverlapped) {
+        WSABUF wsaData;
+        DWORD flags = 0;
+
+        wsaData.buf = (char*) buf;
+        wsaData.len = (u_long)*nbytes;
+        if (WSASend((SOCKET)file->filehand, &wsaData, 1, &bwrote,
+                    flags, NULL, NULL) == SOCKET_ERROR) {
+            rv = apr_get_netos_error();
+            bwrote = 0;
+        }
+        else {
+            rv = APR_SUCCESS;
+        }
+        *nbytes = bwrote;
+    }
+    else {
+        int pipe_or_socket = (thefile->pipe || thefile->socket);
+
+        if (pipe_or_socket) {
             rv = WriteFile(thefile->filehand, buf, (DWORD)*nbytes, &bwrote,
                            thefile->pOverlapped);
         }
@@ -545,7 +581,7 @@ APR_DECLARE(apr_status_t) apr_file_write
                 }
             }
         }
-        if (rv == APR_SUCCESS && thefile->pOverlapped && !thefile->pipe) {
+        if (rv == APR_SUCCESS && thefile->pOverlapped && !pipe_or_socket) {
             thefile->filePtr += *nbytes;
         }
     }

Modified: apr/apr/trunk/include/arch/win32/apr_arch_file_io.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/arch/win32/apr_arch_file_io.h?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/include/arch/win32/apr_arch_file_io.h (original)
+++ apr/apr/trunk/include/arch/win32/apr_arch_file_io.h Fri Nov 19 13:23:11 2021
@@ -162,7 +162,7 @@ apr_status_t more_finfo(apr_finfo_t *fin
 struct apr_file_t {
     apr_pool_t *pool;
     HANDLE filehand;
-    BOOLEAN pipe;              /* Is this a pipe of a file? */
+    BOOLEAN pipe, socket;      /* Is this a pipe, a socket or a file? */
     OVERLAPPED *pOverlapped;
     apr_interval_time_t timeout;
     apr_int32_t flags;

Modified: apr/apr/trunk/network_io/os2/sockopt.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/network_io/os2/sockopt.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/network_io/os2/sockopt.c (original)
+++ apr/apr/trunk/network_io/os2/sockopt.c Fri Nov 19 13:23:11 2021
@@ -32,8 +32,22 @@
 APR_DECLARE(apr_status_t) apr_socket_timeout_set(apr_socket_t *sock, 
                                                  apr_interval_time_t t)
 {
+    apr_status_t rv = APR_SUCCESS;
+
+    /* If our new timeout is non-negative and our old timeout was
+     * negative, then we need to ensure that we are non-blocking.
+     * Conversely, if our new timeout is negative and we had
+     * non-negative timeout, we must make sure our socket is blocking.
+     */
+    if (t == 0 && sock->timeout != 0) {
+        rv = apr_socket_opt_set(sock, APR_SO_NONBLOCK, 1);
+    }
+    else if (t != 0 && sock->timeout == 0) {
+        rv = apr_socket_opt_set(sock, APR_SO_NONBLOCK, 0);
+    } 
+
     sock->timeout = t;
-    return APR_SUCCESS;
+    return rv;
 }
 
 

Modified: apr/apr/trunk/poll/os2/pollset.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/os2/pollset.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/poll/os2/pollset.c (original)
+++ apr/apr/trunk/poll/os2/pollset.c Fri Nov 19 13:23:11 2021
@@ -67,7 +67,6 @@ APR_DECLARE(apr_status_t) apr_pollset_cr
 
         if (rc == APR_SUCCESS) {
             apr_sockaddr_t *listen_address;
-            apr_socket_timeout_set((*pollset)->wake_listen, 0);
             apr_sockaddr_info_get(&listen_address, "", APR_UNIX, 0, 0, p);
             rc = apr_socket_bind((*pollset)->wake_listen, listen_address);
 
@@ -80,6 +79,7 @@ APR_DECLARE(apr_status_t) apr_pollset_cr
                 wake_poll_fd.client_data = NULL;
                 apr_pollset_add(*pollset, &wake_poll_fd);
                 apr_socket_addr_get(&(*pollset)->wake_address, APR_LOCAL, (*pollset)->wake_listen);
+                apr_socket_timeout_set((*pollset)->wake_listen, 0);
 
                 rc = apr_socket_create(&(*pollset)->wake_sender, APR_UNIX, SOCK_DGRAM, 0, p);
             }
@@ -263,17 +263,14 @@ APR_DECLARE(apr_status_t) apr_pollset_po
 
         if (rtnevents) {
             if (i == 0 && pollset->wake_listen != NULL) {
+                char ch;
+                apr_size_t len = 1;
                 struct apr_sockaddr_t from_addr;
-                char buffer[16];
-                apr_size_t buflen;
-                for (;;) {
-                    buflen = sizeof(buffer);
-                    rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
-                                             MSG_DONTWAIT, buffer, &buflen);
-                    if (rv != APR_SUCCESS) {
-                        break;
-                    }
-                    /* Woken up, drain the pipe still. */
+                rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
+                                          MSG_DONTWAIT, &ch, &len);
+                if (rv == APR_SUCCESS) {
+                    /* Woken up, senders can fill the pipe again */
+                    apr_atomic_set32(&pollset->wakeup_set, 0);
                     rc = APR_EINTR;
                 }
             }
@@ -298,12 +295,15 @@ APR_DECLARE(apr_status_t) apr_pollset_po
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->wake_sender) {
+    if (!pollset->wake_sender)
+        return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) {
         apr_size_t len = 1;
         return apr_socket_sendto(pollset->wake_sender, pollset->wake_address, 0, "", &len);
     }
 
-    return APR_EINIT;
+    return APR_SUCCESS;
 }
 
 

Modified: apr/apr/trunk/poll/unix/pollcb.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/pollcb.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/pollcb.c (original)
+++ apr/apr/trunk/poll/unix/pollcb.c Fri Nov 19 13:23:11 2021
@@ -214,14 +214,13 @@ APR_DECLARE(apr_status_t) apr_pollcb_pol
 
 APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb)
 {
-    if (pollcb->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollcb->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollcb->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(const char *) apr_pollcb_method_name(apr_pollcb_t *pollcb)

Modified: apr/apr/trunk/poll/unix/pollset.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/pollset.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/pollset.c (original)
+++ apr/apr/trunk/poll/unix/pollset.c Fri Nov 19 13:23:11 2021
@@ -218,14 +218,13 @@ APR_DECLARE(apr_status_t) apr_pollset_de
 
 APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
 {
-    if (pollset->flags & APR_POLLSET_WAKEABLE) {
-        if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
-            return apr_file_putc(1, pollset->wakeup_pipe[1]);
-        else
-           return APR_SUCCESS;
-    }
-    else
+    if (!(pollset->flags & APR_POLLSET_WAKEABLE))
         return APR_EINIT;
+
+    if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+
+    return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,

Modified: apr/apr/trunk/poll/unix/wakeup.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/wakeup.c?rev=1895175&r1=1895174&r2=1895175&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/wakeup.c (original)
+++ apr/apr/trunk/poll/unix/wakeup.c Fri Nov 19 13:23:11 2021
@@ -81,8 +81,9 @@ apr_status_t apr_poll_create_wakeup_pipe
 {
     apr_status_t rv;
 
-    if ((rv = apr_file_pipe_create(&wakeup_pipe[0], &wakeup_pipe[1],
-                                   pool)) != APR_SUCCESS)
+    /* Read end of the pipe is non-blocking */
+    if ((rv = apr_file_pipe_create_ex(&wakeup_pipe[0], &wakeup_pipe[1],
+                                      APR_WRITE_BLOCK, pool)))
         return rv;
 
     pfd->p = pool;
@@ -137,16 +138,9 @@ apr_status_t apr_poll_close_wakeup_pipe(
  */
 void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe)
 {
+    char ch;
 
-    while (apr_atomic_cas32(wakeup_set, 0, 1) > 0) {
-        char ch;
-        /* though we write just one byte to the other end of the pipe
-         * during wakeup, multiple threads could call the wakeup.
-         * So simply drain out from the input side of the pipe all
-         * the data.
-         */
-        if (apr_file_getc(&ch, wakeup_pipe[0]) != APR_SUCCESS)
-            break;
-    }
+    (void)apr_file_getc(&ch, wakeup_pipe[0]);
+    apr_atomic_set32(wakeup_set, 0);
 }