You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by br...@apache.org on 2005/09/25 07:22:08 UTC

svn commit: r291377 - in /httpd/httpd/branches/async-dev: CHANGES include/httpd.h server/core_filters.c server/mpm/experimental/event/event.c

Author: brianp
Date: Sat Sep 24 22:22:04 2005
New Revision: 291377

URL: http://svn.apache.org/viewcvs?rev=291377&view=rev
Log:
New version of ap_core_output_filter...
The big change is that it now does nonblocking writes when possible.  The
goal of this redesign is to more cleanly support asynchronous write completion
in MPMs such as Event or Leader.  However, the nonblocking writes may also
help in Worker and Prefork by allowing more overlapping of handler and
output filter processing with network writes.

Modified:
    httpd/httpd/branches/async-dev/CHANGES
    httpd/httpd/branches/async-dev/include/httpd.h
    httpd/httpd/branches/async-dev/server/core_filters.c
    httpd/httpd/branches/async-dev/server/mpm/experimental/event/event.c

Modified: httpd/httpd/branches/async-dev/CHANGES
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/CHANGES?rev=291377&r1=291376&r2=291377&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/CHANGES [utf-8] (original)
+++ httpd/httpd/branches/async-dev/CHANGES [utf-8] Sat Sep 24 22:22:04 2005
@@ -1,6 +1,8 @@
                                                         -*- coding: utf-8 -*-
 Changes in Apache 2.3.0 async-dev R&D branch
 
+  *) Rewrite of ap_core_output_filter to do nonblocking writes [Brian Pane]
+
   *) Added new connection states for handler and write completion
      [Brian Pane]
 

Modified: httpd/httpd/branches/async-dev/include/httpd.h
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/include/httpd.h?rev=291377&r1=291376&r2=291377&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/include/httpd.h (original)
+++ httpd/httpd/branches/async-dev/include/httpd.h Sat Sep 24 22:22:04 2005
@@ -1198,11 +1198,9 @@
 };
 
 typedef struct core_output_filter_ctx {
-    apr_bucket_brigade *b;
-    /** subpool of c->pool used for resources 
-     * which may outlive the request
-     */
-    apr_pool_t *deferred_write_pool;
+    apr_bucket_brigade *buffered_bb;
+    apr_size_t bytes_in;
+    apr_size_t bytes_written;
 } core_output_filter_ctx_t;
  
 typedef struct core_filter_ctx {

Modified: httpd/httpd/branches/async-dev/server/core_filters.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/core_filters.c?rev=291377&r1=291376&r2=291377&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/server/core_filters.c (original)
+++ httpd/httpd/branches/async-dev/server/core_filters.c Sat Sep 24 22:22:04 2005
@@ -26,6 +26,7 @@
 #include "apr_hash.h"
 #include "apr_thread_proc.h"    /* for RLIMIT stuff */
 #include "apr_hooks.h"
+#include "apr_support.h"
 
 #define APR_WANT_IOVEC
 #define APR_WANT_STRFUNC
@@ -57,6 +58,7 @@
 #include "mod_so.h" /* for ap_find_loaded_module_symbol */
 
 #define AP_MIN_SENDFILE_BYTES           (256)
+// #define APR_HAS_SENDFILE 0
 
 typedef struct net_time_filter_ctx {
     apr_socket_t *csd;
@@ -340,218 +342,178 @@
     return APR_SUCCESS;
 }
 
-static apr_status_t writev_it_all(apr_socket_t *s,
-                                  struct iovec *vec, int nvec,
-                                  apr_size_t len, apr_size_t *nbytes)
-{
-    apr_size_t bytes_written = 0;
-    apr_status_t rv;
-    apr_size_t n = len;
-    int i = 0;
+static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
+                                             apr_bucket_brigade *bb,
+                                             apr_size_t *bytes_written,
+                                             conn_rec *c);
 
-    *nbytes = 0;
+static void remove_empty_buckets(apr_bucket_brigade *bb);
 
-    /* XXX handle checking for non-blocking socket */
-    while (bytes_written != len) {
-        rv = apr_socket_sendv(s, vec + i, nvec - i, &n);
-        *nbytes += n;
-        bytes_written += n;
-        if (rv != APR_SUCCESS)
-            return rv;
+static apr_status_t send_brigade_blocking(apr_socket_t *s,
+                                          apr_bucket_brigade *bb,
+                                          apr_size_t *bytes_written,
+                                          conn_rec *c);
 
-        /* If the write did not complete, adjust the iovecs and issue
-         * apr_socket_sendv again
-         */
-        if (bytes_written < len) {
-            /* Skip over the vectors that have already been written */
-            apr_size_t cnt = vec[i].iov_len;
-            while (n >= cnt && i + 1 < nvec) {
-                i++;
-                cnt += vec[i].iov_len;
-            }
+static apr_status_t writev_nonblocking(apr_socket_t *s,
+                                       struct iovec *vec, apr_size_t nvec,
+                                       apr_bucket_brigade *bb,
+                                       apr_size_t *cumulative_bytes_written);
 
-            if (n < cnt) {
-                /* Handle partial write of vec i */
-                vec[i].iov_base = (char *) vec[i].iov_base +
-                    (vec[i].iov_len - (cnt - n));
-                vec[i].iov_len = cnt -n;
-            }
-        }
+static apr_status_t sendfile_nonblocking(apr_socket_t *s,
+                                         apr_bucket_brigade *bb,
+                                         apr_size_t *cumulative_bytes_written);
 
-        n = len - bytes_written;
-    }
+#define THRESHOLD_MIN_WRITE 4096
+#define THRESHOLD_MAX_BUFFER 65536
 
-    return APR_SUCCESS;
-}
+/* XXX Add mod_logio support back into ap_core_output_filter */
 
-/* sendfile_it_all()
- *  send the entire file using sendfile()
- *  handle partial writes
- *  return only when all bytes have been sent or an error is encountered.
- */
-
-#if APR_HAS_SENDFILE
-static apr_status_t sendfile_it_all(core_net_rec *c,
-                                    apr_file_t *fd,
-                                    apr_hdtr_t *hdtr,
-                                    apr_off_t   file_offset,
-                                    apr_size_t  file_bytes_left,
-                                    apr_size_t  total_bytes_left,
-                                    apr_size_t  *bytes_sent,
-                                    apr_int32_t flags)
+apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
 {
-    apr_status_t rv;
-#ifdef AP_DEBUG
-    apr_interval_time_t timeout = 0;
-#endif
-
-    AP_DEBUG_ASSERT((apr_socket_timeout_get(c->client_socket, &timeout) 
-                         == APR_SUCCESS)
-                    && timeout > 0);  /* socket must be in timeout mode */
-
-    /* Reset the bytes_sent field */
-    *bytes_sent = 0;
-
-    do {
-        apr_size_t tmplen = file_bytes_left;
-
-        rv = apr_socket_sendfile(c->client_socket, fd, hdtr, &file_offset, &tmplen,
-                                 flags);
-        *bytes_sent += tmplen;
-        total_bytes_left -= tmplen;
-        if (!total_bytes_left || rv != APR_SUCCESS) {
-            return rv;        /* normal case & error exit */
-        }
-
-        AP_DEBUG_ASSERT(total_bytes_left > 0 && tmplen > 0);
-
-        /* partial write, oooh noooo...
-         * Skip over any header data which was written
-         */
-        while (tmplen && hdtr->numheaders) {
-            if (tmplen >= hdtr->headers[0].iov_len) {
-                tmplen -= hdtr->headers[0].iov_len;
-                --hdtr->numheaders;
-                ++hdtr->headers;
-            }
-            else {
-                char *iov_base = (char *)hdtr->headers[0].iov_base;
+    conn_rec *c = f->c;
+    core_net_rec *net = f->ctx;
+    core_output_filter_ctx_t *ctx = net->out_ctx;
+    apr_bucket_brigade *bb;
+    apr_bucket *bucket, *next;
+    apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
+       
+    if (ctx == NULL) {
+        ctx = apr_pcalloc(c->pool, sizeof(*ctx));
+        net->out_ctx = (core_output_filter_ctx_t *)ctx;
+    }
 
-                hdtr->headers[0].iov_len -= tmplen;
-                iov_base += tmplen;
-                hdtr->headers[0].iov_base = iov_base;
-                tmplen = 0;
+    if (new_bb != NULL) {
+        for (bucket = APR_BRIGADE_FIRST(new_bb); bucket != APR_BRIGADE_SENTINEL(new_bb); bucket = APR_BUCKET_NEXT(bucket)) {
+            if (bucket->length > 0) {
+                ctx->bytes_in += bucket->length;
             }
         }
-
-        /* Skip over any file data which was written */
-
-        if (tmplen <= file_bytes_left) {
-            file_offset += tmplen;
-            file_bytes_left -= tmplen;
-            continue;
+    }
+    
+    if ((ctx->buffered_bb != NULL) &&
+        !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
+        bb = ctx->buffered_bb;
+        ctx->buffered_bb = NULL;
+        if (new_bb != NULL) {
+            APR_BRIGADE_CONCAT(bb, new_bb);
         }
+    }
+    else if (new_bb != NULL) {
+        bb = new_bb;
+    }
 
-        tmplen -= file_bytes_left;
-        file_bytes_left = 0;
-        file_offset = 0;
-
-        /* Skip over any trailer data which was written */
+    /* Scan through the brigade and decide whether to attempt a write,
+     * based on the following rules:
+     *
+     *  1) The new_bb is null: Do a nonblocking write of as much as
+     *     possible: do a nonblocking write of as much data as possible,
+     *     then save the rest in ctx->buffered_bb.  (If new_bb == NULL,
+     *     it probably means that the MPM is doing asynchronous write
+     *     completion and has just determined that this connection
+     *     is writable.)
+     *
+     *  2) The brigade contains a flush bucket: Do a blocking write
+     *     of everything up that point.
+     *
+     *  3) The request is in CONN_STATE_HANLDER state, and the brigade
+     *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
+     *     buckets: Do blocking writes until the amount of data in the
+     *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
+     *     rule is to provide flow control, in case a handler is
+     *     streaming out lots of data faster than the data can be
+     *     sent to the client.)
+     *
+     *  4) The brigade contains at least THRESHOLD_MIN_WRITE
+     *     bytes: Do a nonblocking write of as much data as possible,
+     *     then save the rest in ctx->buffered_bb.
+     */
 
-        while (tmplen && hdtr->numtrailers) {
-            if (tmplen >= hdtr->trailers[0].iov_len) {
-                tmplen -= hdtr->trailers[0].iov_len;
-                --hdtr->numtrailers;
-                ++hdtr->trailers;
+    if (new_bb == NULL) {
+        apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
+                                                   &(ctx->bytes_written), c);
+        if (APR_STATUS_IS_EAGAIN(rv)) {
+            return APR_SUCCESS;
+        }
+        else {
+            return rv;
+        }
+    }
+    
+    bytes_in_brigade = 0;
+    non_file_bytes_in_brigade = 0;
+    for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
+         bucket = next) {
+        next = APR_BUCKET_NEXT(bucket);
+        if (APR_BUCKET_IS_FLUSH(bucket)) {
+            apr_bucket_brigade *remainder = apr_brigade_split(bb, next);
+            apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
+                                                    &(ctx->bytes_written), c);
+            if (rv != APR_SUCCESS) {
+                return rv;
             }
-            else {
-                char *iov_base = (char *)hdtr->trailers[0].iov_base;
-
-                hdtr->trailers[0].iov_len -= tmplen;
-                iov_base += tmplen;
-                hdtr->trailers[0].iov_base = iov_base;
-                tmplen = 0;
+            bb = remainder;
+            next = APR_BRIGADE_FIRST(bb);
+            bytes_in_brigade = 0;
+            non_file_bytes_in_brigade = 0;
+        }
+        else if (!APR_BUCKET_IS_METADATA(bucket)) {
+            if (bucket->length < 0) {
+                const char *data;
+                apr_size_t length;
+                /* XXX support nonblocking read here? */
+                apr_status_t rv =
+                    apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+                if (rv != APR_SUCCESS) {
+                    return rv;
+                }
+                /* reading may have split the bucket, so recompute next: */
+                next = APR_BUCKET_NEXT(bucket);
+            }
+            bytes_in_brigade += bucket->length;
+            if (!APR_BUCKET_IS_FILE(bucket)) {
+                non_file_bytes_in_brigade += bucket->length;
             }
         }
-    } while (1);
-}
-#endif
-
-/*
- * emulate_sendfile()
- * Sends the contents of file fd along with header/trailer bytes, if any,
- * to the network. emulate_sendfile will return only when all the bytes have been
- * sent (i.e., it handles partial writes) or on a network error condition.
- */
-static apr_status_t emulate_sendfile(core_net_rec *c, apr_file_t *fd,
-                                     apr_hdtr_t *hdtr, apr_off_t offset,
-                                     apr_size_t length, apr_size_t *nbytes)
-{
-    apr_status_t rv = APR_SUCCESS;
-    apr_size_t togo;        /* Remaining number of bytes in the file to send */
-    apr_size_t sendlen = 0;
-    apr_size_t bytes_sent;
-    apr_int32_t i;
-    apr_off_t o;             /* Track the file offset for partial writes */
-    char buffer[8192];
-
-    *nbytes = 0;
-
-    /* Send the headers
-     * writev_it_all handles partial writes.
-     * XXX: optimization... if headers are less than MIN_WRITE_SIZE, copy
-     * them into buffer
-     */
-    if (hdtr && hdtr->numheaders > 0 ) {
-        for (i = 0; i < hdtr->numheaders; i++) {
-            sendlen += hdtr->headers[i].iov_len;
-        }
-
-        rv = writev_it_all(c->client_socket, hdtr->headers, hdtr->numheaders,
-                           sendlen, &bytes_sent);
-        *nbytes += bytes_sent;     /* track total bytes sent */
     }
 
-    /* Seek the file to 'offset' */
-    if (offset >= 0 && rv == APR_SUCCESS) {
-        rv = apr_file_seek(fd, APR_SET, &offset);
+    if (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) {
+        /* ### Writing the entire brigade may be excessive; we really just
+         * ### need to send enough data to be under THRESHOLD_MAX_BUFFER.
+         */
+        apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
+                                                &(ctx->bytes_written), c);
+        if (rv != APR_SUCCESS) {
+            return rv;
+        }
     }
-
-    /* Send the file, making sure to handle partial writes */
-    togo = length;
-    while (rv == APR_SUCCESS && togo) {
-        sendlen = togo > sizeof(buffer) ? sizeof(buffer) : togo;
-        o = 0;
-        rv = apr_file_read(fd, buffer, &sendlen);
-        while (rv == APR_SUCCESS && sendlen) {
-            bytes_sent = sendlen;
-            rv = apr_socket_send(c->client_socket, &buffer[o], &bytes_sent);
-            *nbytes += bytes_sent;
-            if (rv == APR_SUCCESS) {
-                sendlen -= bytes_sent; /* sendlen != bytes_sent ==> partial write */
-                o += bytes_sent;       /* o is where we are in the buffer */
-                togo -= bytes_sent;    /* track how much of the file we've sent */
-            }
+    else if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
+        apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
+                                                   &(ctx->bytes_written), c);
+        if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
+            return rv;
         }
     }
 
-    /* Send the trailers
-     * XXX: optimization... if it will fit, send this on the last send in the
-     * loop above
-     */
-    sendlen = 0;
-    if ( rv == APR_SUCCESS && hdtr && hdtr->numtrailers > 0 ) {
-        for (i = 0; i < hdtr->numtrailers; i++) {
-            sendlen += hdtr->trailers[i].iov_len;
-        }
-        rv = writev_it_all(c->client_socket, hdtr->trailers, hdtr->numtrailers,
-                           sendlen, &bytes_sent);
-        *nbytes += bytes_sent;
+    if (!APR_BRIGADE_EMPTY(bb)) {
+        if (new_bb == NULL) {
+            /* Everything in bb must have been in ctx->buffered_bb
+             * before this function was called.  Thus we can just
+             * move it back, without incurring the potential copying
+             * cost of a bucket setaside.
+             */
+            ctx->buffered_bb = bb;
+        }
+        else {
+            /* XXX should this use a separate deferred write pool, like
+             * the original ap_core_output_filter?
+             */
+            ap_save_brigade(f, &(ctx->buffered_bb), &bb, c->pool);
+        }
     }
 
-    return rv;
+    return APR_SUCCESS;
 }
-
+ 
 #ifndef APR_MAX_IOVEC_SIZE 
 #define MAX_IOVEC_TO_WRITE 16
 #else
@@ -562,400 +524,224 @@
 #endif
 #endif
 
-/* Optional function coming from mod_logio, used for logging of output
- * traffic
- */
-extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *logio_add_bytes_out;
-
-apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *b)
+static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
+                                             apr_bucket_brigade *bb,
+                                             apr_size_t *bytes_written,
+                                             conn_rec *c)
 {
+    apr_bucket *bucket, *next;
     apr_status_t rv;
-    apr_bucket_brigade *more;
-    conn_rec *c = f->c;
-    core_net_rec *net = f->ctx;
-    core_output_filter_ctx_t *ctx = net->out_ctx;
-    apr_read_type_e eblock = APR_NONBLOCK_READ;
-    apr_pool_t *input_pool = b->p;
+    apr_interval_time_t old_timeout;
+    struct iovec vec[MAX_IOVEC_TO_WRITE];
+    apr_size_t nvec = 0;
 
-    if (ctx == NULL) {
-        ctx = apr_pcalloc(c->pool, sizeof(*ctx));
-        net->out_ctx = ctx;
+    rv = apr_socket_timeout_get(s, &old_timeout);
+    if (rv != APR_SUCCESS) {
+        return rv;
     }
-
-    /* If we have a saved brigade, concatenate the new brigade to it */
-    if (ctx->b) {
-        APR_BRIGADE_CONCAT(ctx->b, b);
-        b = ctx->b;
-        ctx->b = NULL;
+    rv = apr_socket_timeout_set(s, 0);
+    if (rv != APR_SUCCESS) {
+        return rv;
     }
 
-    /* Perform multiple passes over the brigade, sending batches of output
-       to the connection. */
-    while (b && !APR_BRIGADE_EMPTY(b)) {
-        apr_size_t nbytes = 0;
-        apr_bucket *last_e = NULL; /* initialized for debugging */
-        apr_bucket *e;
-
-        /* one group of iovecs per pass over the brigade */
-        apr_size_t nvec = 0;
-        apr_size_t nvec_trailers = 0;
-        struct iovec vec[MAX_IOVEC_TO_WRITE];
-        struct iovec vec_trailers[MAX_IOVEC_TO_WRITE];
-
-        /* one file per pass over the brigade */
-        apr_file_t *fd = NULL;
-        apr_size_t flen = 0;
-        apr_off_t foffset = 0;
-
-        /* keep track of buckets that we've concatenated
-         * to avoid small writes
-         */
-        apr_bucket *last_merged_bucket = NULL;
-
-        /* tail of brigade if we need another pass */
-        more = NULL;
-
-        /* Iterate over the brigade: collect iovecs and/or a file */
-        for (e = APR_BRIGADE_FIRST(b);
-             e != APR_BRIGADE_SENTINEL(b);
-             e = APR_BUCKET_NEXT(e))
-        {
-            /* keep track of the last bucket processed */
-            last_e = e;
-            if (APR_BUCKET_IS_EOS(e) || AP_BUCKET_IS_EOC(e)) {
-                break;
-            }
-            else if (APR_BUCKET_IS_FLUSH(e)) {
-                if (e != APR_BRIGADE_LAST(b)) {
-                    more = apr_brigade_split(b, APR_BUCKET_NEXT(e));
-                }
-                break;
-            }
+    remove_empty_buckets(bb);
 
-            /* It doesn't make any sense to use sendfile for a file bucket
-             * that represents 10 bytes.
+    for (bucket = APR_BRIGADE_FIRST(bb);
+         bucket != APR_BRIGADE_SENTINEL(bb);
+         bucket = next) {
+        int did_sendfile = 0;
+        next = APR_BUCKET_NEXT(bucket);
+#if APR_HAS_SENDFILE
+        if (APR_BUCKET_IS_FILE(bucket)) {
+            apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
+            apr_file_t *fd = file_bucket->fd;
+            /* Use sendfile to send this file unless:
+             *   - the platform doesn't support sendfile,
+             *   - the file is too small for sendfile to be useful, or
+             *   - sendfile is disabled in the httpd config via "EnableSendfile off"
              */
-            else if (APR_BUCKET_IS_FILE(e)
-                     && (e->length >= AP_MIN_SENDFILE_BYTES)) {
-                apr_bucket_file *a = e->data;
-
-                /* We can't handle more than one file bucket at a time
-                 * so we split here and send the file we have already
-                 * found.
-                 */
-                if (fd) {
-                    more = apr_brigade_split(b, e);
-                    break;
-                }
 
-                fd = a->fd;
-                flen = e->length;
-                foffset = e->start;
-            }
-            else {
-                const char *str;
-                apr_size_t n;
-
-                rv = apr_bucket_read(e, &str, &n, eblock);
-                if (APR_STATUS_IS_EAGAIN(rv)) {
-                    /* send what we have so far since we shouldn't expect more
-                     * output for a while...  next time we read, block
-                     */
-                    more = apr_brigade_split(b, e);
-                    eblock = APR_BLOCK_READ;
-                    break;
+            if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
+                (bucket->length >= AP_MIN_SENDFILE_BYTES)) {
+                did_sendfile = 1;
+                (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+                rv = writev_nonblocking(s, vec, nvec, bb, bytes_written);
+                nvec = 0;
+                if (rv != APR_SUCCESS) {
+                    (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
+                    (void)apr_socket_timeout_set(s, old_timeout);
+                    return rv;
                 }
-                eblock = APR_NONBLOCK_READ;
-                if (n) {
-                    if (!fd) {
-                        if (nvec == MAX_IOVEC_TO_WRITE) {
-                            /* woah! too many. buffer them up, for use later. */
-                            apr_bucket *temp, *next;
-                            apr_bucket_brigade *temp_brig;
-
-                            if (nbytes >= AP_MIN_BYTES_TO_WRITE) {
-                                /* We have enough data in the iovec
-                                 * to justify doing a writev
-                                 */
-                                more = apr_brigade_split(b, e);
-                                break;
-                            }
-
-                            /* Create a temporary brigade as a means
-                             * of concatenating a bunch of buckets together
-                             */
-                            if (last_merged_bucket) {
-                                /* If we've concatenated together small
-                                 * buckets already in a previous pass,
-                                 * the initial buckets in this brigade
-                                 * are heap buckets that may have extra
-                                 * space left in them (because they
-                                 * were created by apr_brigade_write()).
-                                 * We can take advantage of this by
-                                 * building the new temp brigade out of
-                                 * these buckets, so that the content
-                                 * in them doesn't have to be copied again.
-                                 */
-                                apr_bucket_brigade *bb;
-                                bb = apr_brigade_split(b,
-                                         APR_BUCKET_NEXT(last_merged_bucket));
-                                temp_brig = b;
-                                b = bb;
-                            }
-                            else {
-                                temp_brig = apr_brigade_create(f->c->pool,
-                                                           f->c->bucket_alloc);
-                            }
-
-                            temp = APR_BRIGADE_FIRST(b);
-                            while (temp != e) {
-                                apr_bucket *d;
-                                rv = apr_bucket_read(temp, &str, &n, APR_BLOCK_READ);
-                                apr_brigade_write(temp_brig, NULL, NULL, str, n);
-                                d = temp;
-                                temp = APR_BUCKET_NEXT(temp);
-                                apr_bucket_delete(d);
-                            }
-
-                            nvec = 0;
-                            nbytes = 0;
-                            temp = APR_BRIGADE_FIRST(temp_brig);
-                            APR_BUCKET_REMOVE(temp);
-                            APR_BRIGADE_INSERT_HEAD(b, temp);
-                            apr_bucket_read(temp, &str, &n, APR_BLOCK_READ);
-                            vec[nvec].iov_base = (char*) str;
-                            vec[nvec].iov_len = n;
-                            nvec++;
-
-                            /* Just in case the temporary brigade has
-                             * multiple buckets, recover the rest of
-                             * them and put them in the brigade that
-                             * we're sending.
-                             */
-                            for (next = APR_BRIGADE_FIRST(temp_brig);
-                                 next != APR_BRIGADE_SENTINEL(temp_brig);
-                                 next = APR_BRIGADE_FIRST(temp_brig)) {
-                                APR_BUCKET_REMOVE(next);
-                                APR_BUCKET_INSERT_AFTER(temp, next);
-                                temp = next;
-                                apr_bucket_read(next, &str, &n,
-                                                APR_BLOCK_READ);
-                                vec[nvec].iov_base = (char*) str;
-                                vec[nvec].iov_len = n;
-                                nvec++;
-                            }
-
-                            apr_brigade_destroy(temp_brig);
-
-                            last_merged_bucket = temp;
-                            e = temp;
-                            last_e = e;
-                        }
-                        else {
-                            vec[nvec].iov_base = (char*) str;
-                            vec[nvec].iov_len = n;
-                            nvec++;
-                        }
-                    }
-                    else {
-                        /* The bucket is a trailer to a file bucket */
-
-                        if (nvec_trailers == MAX_IOVEC_TO_WRITE) {
-                            /* woah! too many. stop now. */
-                            more = apr_brigade_split(b, e);
-                            break;
-                        }
-
-                        vec_trailers[nvec_trailers].iov_base = (char*) str;
-                        vec_trailers[nvec_trailers].iov_len = n;
-                        nvec_trailers++;
-                    }
-
-                    nbytes += n;
+                rv = sendfile_nonblocking(s, bb, bytes_written);
+                (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
+                if (rv != APR_SUCCESS) {
+                    (void)apr_socket_timeout_set(s, old_timeout);
+                    return rv;
                 }
             }
         }
-
-
-        /* Completed iterating over the brigade, now determine if we want
-         * to buffer the brigade or send the brigade out on the network.
-         *
-         * Save if we haven't accumulated enough bytes to send, the connection
-         * is not about to be closed, and:
-         *
-         *   1) we didn't see a file, we don't have more passes over the
-         *      brigade to perform,  AND we didn't stop at a FLUSH bucket.
-         *      (IOW, we will save plain old bytes such as HTTP headers)
-         * or
-         *   2) we hit the EOS and have a keep-alive connection
-         *      (IOW, this response is a bit more complex, but we save it
-         *       with the hope of concatenating with another response)
-         */
-        if (nbytes + flen < AP_MIN_BYTES_TO_WRITE
-            && !AP_BUCKET_IS_EOC(last_e)
-            && ((!fd && !more && !APR_BUCKET_IS_FLUSH(last_e))
-                || (APR_BUCKET_IS_EOS(last_e)
-                    && c->keepalive == AP_CONN_KEEPALIVE))) {
-
-            /* NEVER save an EOS in here.  If we are saving a brigade with
-             * an EOS bucket, then we are doing keepalive connections, and
-             * we want to process to second request fully.
-             */
-            if (APR_BUCKET_IS_EOS(last_e)) {
-                apr_bucket *bucket;
-                int file_bucket_saved = 0;
-                apr_bucket_delete(last_e);
-                for (bucket = APR_BRIGADE_FIRST(b);
-                     bucket != APR_BRIGADE_SENTINEL(b);
-                     bucket = APR_BUCKET_NEXT(bucket)) {
-
-                    /* Do a read on each bucket to pull in the
-                     * data from pipe and socket buckets, so
-                     * that we don't leave their file descriptors
-                     * open indefinitely.  Do the same for file
-                     * buckets, with one exception: allow the
-                     * first file bucket in the brigade to remain
-                     * a file bucket, so that we don't end up
-                     * doing an mmap+memcpy every time a client
-                     * requests a <8KB file over a keepalive
-                     * connection.
-                     */
-                    if (APR_BUCKET_IS_FILE(bucket) && !file_bucket_saved) {
-                        file_bucket_saved = 1;
-                    }
-                    else {
-                        const char *buf;
-                        apr_size_t len = 0;
-                        rv = apr_bucket_read(bucket, &buf, &len,
-                                             APR_BLOCK_READ);
-                        if (rv != APR_SUCCESS) {
-                            ap_log_cerror(APLOG_MARK, APLOG_ERR, rv,
-                                          c, "core_output_filter:"
-                                          " Error reading from bucket.");
-                            return HTTP_INTERNAL_SERVER_ERROR;
-                        }
-                    }
-                }
+#endif /* APR_HAS_SENDFILE */
+        if (!did_sendfile && !APR_BUCKET_IS_METADATA(bucket)) {
+            const char *data;
+            apr_size_t length;
+            rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+            if (rv != APR_SUCCESS) {
+                (void)apr_socket_timeout_set(s, old_timeout);
+                return rv;
             }
-            if (!ctx->deferred_write_pool) {
-                apr_pool_create(&ctx->deferred_write_pool, c->pool);
-                apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
+            /* reading may have split the bucket, so recompute next: */
+            next = APR_BUCKET_NEXT(bucket);
+            vec[nvec].iov_base = (char *)data;
+            vec[nvec].iov_len = length;
+            nvec++;
+            if (nvec == MAX_IOVEC_TO_WRITE) {
+                rv = writev_nonblocking(s, vec, nvec, bb, bytes_written);
+                nvec = 0;
+                if (rv != APR_SUCCESS) {
+                    (void)apr_socket_timeout_set(s, old_timeout);
+                    return rv;
+                }
             }
-            ap_save_brigade(f, &ctx->b, &b, ctx->deferred_write_pool);
-
-            return APR_SUCCESS;
         }
+    }
 
-        if (fd) {
-            apr_hdtr_t hdtr;
-            apr_size_t bytes_sent;
-
-#if APR_HAS_SENDFILE
-            apr_int32_t flags = 0;
-#endif
+    if (nvec > 0) {
+        rv = writev_nonblocking(s, vec, nvec, bb, bytes_written);
+        if (rv != APR_SUCCESS) {
+            (void)apr_socket_timeout_set(s, old_timeout);
+            return rv;
+        }
+    }
 
-            memset(&hdtr, '\0', sizeof(hdtr));
-            if (nvec) {
-                hdtr.numheaders = nvec;
-                hdtr.headers = vec;
-            }
+    remove_empty_buckets(bb);
 
-            if (nvec_trailers) {
-                hdtr.numtrailers = nvec_trailers;
-                hdtr.trailers = vec_trailers;
-            }
+    (void)apr_socket_timeout_set(s, old_timeout);
+    return APR_SUCCESS;
+}
 
-#if APR_HAS_SENDFILE
-            if (apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) {
+static void remove_empty_buckets(apr_bucket_brigade *bb)
+{
+    apr_bucket *bucket;
+    while (((bucket = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) &&
+           (APR_BUCKET_IS_METADATA(bucket) || (bucket->length == 0))) {
+        APR_BUCKET_REMOVE(bucket);
+        apr_bucket_destroy(bucket);
+    }
+}
 
-                if (c->keepalive == AP_CONN_CLOSE && APR_BUCKET_IS_EOS(last_e)) {
-                    /* Prepare the socket to be reused */
-                    flags |= APR_SENDFILE_DISCONNECT_SOCKET;
+static apr_status_t send_brigade_blocking(apr_socket_t *s,
+                                          apr_bucket_brigade *bb,
+                                          apr_size_t *bytes_written,
+                                          conn_rec *c)
+{
+    while (!APR_BRIGADE_EMPTY(bb)) {
+        apr_status_t rv = send_brigade_nonblocking(s, bb, bytes_written, c);
+        if (rv != APR_SUCCESS) {
+            if (APR_STATUS_IS_EAGAIN(rv)) {
+                rv = apr_wait_for_io_or_timeout(NULL, s, 0);
+                if (rv != APR_SUCCESS) {
+                    return rv;
                 }
-
-                rv = sendfile_it_all(net,      /* the network information   */
-                                     fd,       /* the file to send          */
-                                     &hdtr,    /* header and trailer iovecs */
-                                     foffset,  /* offset in the file to begin
-                                                  sending from              */
-                                     flen,     /* length of file            */
-                                     nbytes + flen, /* total length including
-                                                       headers              */
-                                     &bytes_sent,   /* how many bytes were
-                                                       sent                 */
-                                     flags);   /* apr_sendfile flags        */
             }
-            else
-#endif
-            {
-                rv = emulate_sendfile(net, fd, &hdtr, foffset, flen,
-                                      &bytes_sent);
+            else {
+                return rv;
             }
-
-            if (logio_add_bytes_out && bytes_sent > 0)
-                logio_add_bytes_out(c, bytes_sent);
-
-            fd = NULL;
         }
-        else {
-            apr_size_t bytes_sent;
-
-            rv = writev_it_all(net->client_socket,
-                               vec, nvec,
-                               nbytes, &bytes_sent);
+    }
+    return APR_SUCCESS;
+}
 
-            if (logio_add_bytes_out && bytes_sent > 0)
-                logio_add_bytes_out(c, bytes_sent);
-        }
+static apr_status_t writev_nonblocking(apr_socket_t *s,
+                                       struct iovec *vec, apr_size_t nvec,
+                                       apr_bucket_brigade *bb,
+                                       apr_size_t *cumulative_bytes_written)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_size_t bytes_written = 0, bytes_to_write = 0;
+    apr_size_t i, offset;
 
-        apr_brigade_destroy(b);
-        
-        /* drive cleanups for resources which were set aside 
-         * this may occur before or after termination of the request which
-         * created the resource
-         */
-        if (ctx->deferred_write_pool) {
-            if (more && more->p == ctx->deferred_write_pool) {
-                /* "more" belongs to the deferred_write_pool,
-                 * which is about to be cleared.
-                 */
-                if (APR_BRIGADE_EMPTY(more)) {
-                    more = NULL;
+    for (i = 0; i < nvec; i++) {
+        bytes_to_write += vec[i].iov_len;
+    }
+    offset = 0;
+    while (bytes_written < bytes_to_write) {
+        apr_size_t n = 0;
+        rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n);
+        if (n > 0) {
+            bytes_written += n;
+            for (i = offset; i < nvec; ) {
+                apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
+                if (APR_BUCKET_IS_METADATA(bucket)) {
+                    APR_BUCKET_REMOVE(bucket);
+                    apr_bucket_destroy(bucket);
+                }
+                else if (n >= vec[i].iov_len) {
+                    APR_BUCKET_REMOVE(bucket);
+                    apr_bucket_destroy(bucket);
+                    offset++;
+                    n -= vec[i++].iov_len;
                 }
                 else {
-                    /* uh oh... change more's lifetime 
-                     * to the input brigade's lifetime 
-                     */
-                    apr_bucket_brigade *tmp_more = more;
-                    more = NULL;
-                    ap_save_brigade(f, &more, &tmp_more, input_pool);
+                    apr_bucket_split(bucket, n);
+                    APR_BUCKET_REMOVE(bucket);
+                    apr_bucket_destroy(bucket);
+                    vec[i].iov_len -= n;
+                    vec[i].iov_base += n;
+                    break;
                 }
             }
-            apr_pool_clear(ctx->deferred_write_pool);  
         }
-
         if (rv != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, c,
-                          "core_output_filter: writing data to the network");
-
-            if (more)
-                apr_brigade_destroy(more);
-
-            /* No need to check for SUCCESS, we did that above. */
-            if (!APR_STATUS_IS_EAGAIN(rv)) {
-                c->aborted = 1;
-            }
-
-            /* The client has aborted, but the request was successful. We
-             * will report success, and leave it to the access and error
-             * logs to note that the connection was aborted.
-             */
-            return APR_SUCCESS;
+            break;
         }
+    }
+    *cumulative_bytes_written += bytes_written;
+    return rv;
+}
 
-        b = more;
-        more = NULL;
-    }  /* end while () */
+static apr_status_t sendfile_nonblocking(apr_socket_t *s,
+                                         apr_bucket_brigade *bb,
+                                         apr_size_t *cumulative_bytes_written)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_bucket *bucket;
+    apr_bucket_file *file_bucket;
+    apr_file_t *fd;
+    apr_size_t file_length;
+    apr_off_t file_offset;
+    apr_size_t bytes_written = 0;
 
-    return APR_SUCCESS;
+    bucket = APR_BRIGADE_FIRST(bb);
+    if (!APR_BUCKET_IS_FILE(bucket)) {
+        /* XXX log a "this should never happen" message */
+        return APR_EGENERAL;
+    }
+    file_bucket = (apr_bucket_file *)(bucket->data);
+    fd = file_bucket->fd;
+    file_length = bucket->length;
+    file_offset = bucket->start;
+
+    while (bytes_written < file_length) {
+        apr_size_t n = file_length - bytes_written;
+        rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0);
+        if (rv == APR_SUCCESS) {
+            bytes_written += n;
+            file_offset += n;
+        }
+        else {
+            break;
+        }
+    }
+    cumulative_bytes_written += bytes_written;
+    if ((bytes_written < file_length) && (bytes_written > 0)) {
+        apr_bucket_split(bucket, bytes_written);
+        APR_BUCKET_REMOVE(bucket);
+        apr_bucket_destroy(bucket);
+    }
+    else if (bytes_written == file_length) {
+        APR_BUCKET_REMOVE(bucket);
+        apr_bucket_destroy(bucket);
+    }
+    return rv;
 }
+

Modified: httpd/httpd/branches/async-dev/server/mpm/experimental/event/event.c
URL: http://svn.apache.org/viewcvs/httpd/httpd/branches/async-dev/server/mpm/experimental/event/event.c?rev=291377&r1=291376&r2=291377&view=diff
==============================================================================
--- httpd/httpd/branches/async-dev/server/mpm/experimental/event/event.c (original)
+++ httpd/httpd/branches/async-dev/server/mpm/experimental/event/event.c Sat Sep 24 22:22:04 2005
@@ -168,9 +168,6 @@
 
 static apr_pollset_t *event_pollset;
 
-extern apr_status_t ap_mpm_custom_write_filter(ap_filter_t *f,
-                                                     apr_bucket_brigade *bb);
-
 /* The structure used to pass unique initialization info to each thread */
 typedef struct
 {
@@ -340,7 +337,7 @@
         *result = 1;
         return APR_SUCCESS;
     case AP_MPMQ_CUSTOM_WRITE:
-        *result = 1;
+        *result = 0;
         return APR_SUCCESS;
     case AP_MPMQ_HARD_LIMIT_DAEMONS:
         *result = server_limit;
@@ -2209,8 +2206,6 @@
      * to retrieve it, so register as REALLY_FIRST
      */
     ap_hook_pre_config(worker_pre_config, NULL, NULL, APR_HOOK_REALLY_FIRST);
-
-    APR_REGISTER_OPTIONAL_FN(ap_mpm_custom_write_filter);
 }
 
 static const char *set_daemons_to_start(cmd_parms *cmd, void *dummy,