You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Bill Stoddard <bi...@wstoddard.com> on 2002/10/15 19:31:07 UTC

[PATCH] event driven read

Something I've been hacking on (in the pejorative sense of the word 'hack'.
Look at the patch and you will see what I mean :-).  This should apply and
serve pages on Linux, though the event_loop is clearly broken as it does not
timeout keep-alive connections and will hang on the apr_poll() (and hang the
server) if a client leaves a keep-alive connection active but does not send
anything on it. Scoreboard is broken, code structure is poor, yadda yadda. I
plan to reimplement some of this more cleanly but no idea when I'll get
around to it. Key points:

1. routines to read requests must be able to handl getting APR_EAGAIN (or
APR_EWOULDBLOCK):
2. ap_process_http_connection reimplemented to be state driven
3. event loop in worker_mpm to wait for pending i/o

Bill

Index: include/http_protocol.h
===================================================================
RCS file: /home/cvs/httpd-2.0/include/http_protocol.h,v
retrieving revision 1.83
diff -u -r1.83 http_protocol.h
--- include/http_protocol.h	11 Jul 2002 19:53:04 -0000	1.83
+++ include/http_protocol.h	15 Oct 2002 14:54:01 -0000
@@ -92,6 +92,13 @@
 request_rec *ap_read_request(conn_rec *c);

 /**
+ * Read a request and fill in the fields.
+ * @param c The current connection
+ * @return The new request_rec
+ */
+request_rec *ap_create_request(conn_rec *c);
+
+/**
  * Read the mime-encoded headers.
  * @param r The current request
  */
@@ -103,8 +110,8 @@
  * @param r The current request
  * @param bb temp brigade
  */
-AP_DECLARE(void) ap_get_mime_headers_core(request_rec *r,
-                                          apr_bucket_brigade *bb);
+AP_DECLARE(apr_status_t) ap_get_mime_headers_core(request_rec *r,
+                                                  http_state_t *hs);

 /* Finish up stuff after a request */

@@ -582,6 +589,7 @@
  * @param r The request
  * @param fold Whether to merge continuation lines
  * @param bb Working brigade to use when reading buckets
+ * @param block block or non block
  * @return APR_SUCCESS, if successful
  *         APR_ENOSPC, if the line is too big to fit in the buffer
  *         Other errors where appropriate
@@ -590,15 +598,17 @@
 AP_DECLARE(apr_status_t) ap_rgetline(char **s, apr_size_t n,
                                      apr_size_t *read,
                                      request_rec *r, int fold,
-                                     apr_bucket_brigade *bb);
+                                     apr_bucket_brigade *bb,
+                                     apr_read_type_e block);
 #else /* ASCII box */
-#define ap_rgetline(s, n, read, r, fold, bb) \
-        ap_rgetline_core((s), (n), (read), (r), (fold), (bb))
+#define ap_rgetline(s, n, read, r, fold, bb, block) \
+        ap_rgetline_core((s), (n), (read), (r), (fold), (bb), (block))
 #endif
 AP_DECLARE(apr_status_t) ap_rgetline_core(char **s, apr_size_t n,
                                           apr_size_t *read,
                                           request_rec *r, int fold,
-                                          apr_bucket_brigade *bb);
+                                          apr_bucket_brigade *bb,
+                                          apr_read_type_e block);

 /**
  * Get the method number associated with the given string, assumed to
Index: include/httpd.h
===================================================================
RCS file: /home/cvs/httpd-2.0/include/httpd.h,v
retrieving revision 1.189
diff -u -r1.189 httpd.h
--- include/httpd.h	1 Jul 2002 17:49:53 -0000	1.189
+++ include/httpd.h	15 Oct 2002 14:54:03 -0000
@@ -684,6 +684,8 @@
 /** A structure that represents the current request */
 typedef struct request_rec request_rec;

+typedef struct http_state_t http_state_t;
+
 /* ### would be nice to not include this from httpd.h ... */
 /* This comes after we have defined the request_rec type */
 #include "apr_uri.h"
@@ -1017,8 +1019,32 @@
     void *sbh;
     /** The bucket allocator to use for all bucket/brigade creations */
     struct apr_bucket_alloc_t *bucket_alloc;
+
+    /* request rec */
+    request_rec *r;
+    http_state_t *hs;
 };

+typedef enum  {
+    HTTP_STATE_NEW_CONNECTION,
+    HTTP_STATE_READ_REQUEST_LINE,
+    HTTP_STATE_PARSE_REQUEST_LINE,
+    HTTP_STATE_READ_MIME_HEADERS,
+    HTTP_STATE_WRITE_RESPONSE,
+    HTTP_STATE_LINGER,
+    HTTP_STATE_DONE,
+    HTTP_STATE_ERROR
+} http_state_e;
+
+struct http_state_t {
+    http_state_e state;
+    conn_rec *c;
+    request_rec *r;
+    apr_bucket_brigade *bb;
+    apr_table_t *headers;
+    apr_pool_t *p;
+    apr_socket_t *sock;
+};
 /* Per-vhost config... */

 /**
Index: modules/http/http_core.c
===================================================================
RCS file: /home/cvs/httpd-2.0/modules/http/http_core.c,v
retrieving revision 1.307
diff -u -r1.307 http_core.c
--- modules/http/http_core.c	15 Jul 2002 08:05:11 -0000	1.307
+++ modules/http/http_core.c	15 Oct 2002 14:54:04 -0000
@@ -56,8 +56,10 @@
  * University of Illinois, Urbana-Champaign.
  */

+#include "apr.h"
 #include "apr_strings.h"
 #include "apr_thread_proc.h"    /* for RLIMIT stuff */
+#include "apr_lib.h"

 #define APR_WANT_STRFUNC
 #include "apr_want.h"
@@ -76,6 +78,8 @@
 #include "scoreboard.h"

 #include "mod_core.h"
+#include "http_vhost.h"
+#include "http_log.h"

 /* Handles for core filters */
 AP_DECLARE_DATA ap_filter_rec_t *ap_http_input_filter_handle;
@@ -281,7 +285,6 @@
      * Read and process each request found on our connection
      * until no requests are left or we decide to close.
      */
-
     ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
     while ((r = ap_read_request(c)) != NULL) {

@@ -313,7 +316,277 @@

     return OK;
 }
+static int ap_parse_request_line(request_rec *r)
+{
+    int major = 1, minor = 0;   /* Assume HTTP/1.0 if non-"HTTP" protocol
*/
+    apr_size_t len = strlen(r->the_request);
+    const char *ll;
+    const char *uri;
+    const char *pro;
+
+#if 0
+    if (r->status == HTTP_REQUEST_URI_TOO_LARGE) {
+        /*
+           ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+           "request failed: URI too long");
+        */
+        ap_send_error_response(r, 0);
+        ap_run_log_transaction(r);
+        apr_brigade_destroy(hs->bb);
+        return DONE;
+    }
+#endif
+    /* we've probably got something to do, ignore graceful restart requests
*/
+    r->request_time = apr_time_now();
+    ll = r->the_request;
+    r->method = ap_getword_white(r->pool, &ll);
+
+    uri = ap_getword_white(r->pool, &ll);
+
+    /* Provide quick information about the request method as soon as known
*/
+    r->method_number = ap_method_number_of(r->method);
+    if (r->method_number == M_GET && r->method[0] == 'H') {
+        r->header_only = 1;
+    }
+
+    ap_parse_uri(r, uri);
+
+    /* ap_getline returns (size of max buffer - 1) if it fills up the
+     * buffer before finding the end-of-line.  This is only going to
+     * happen if it exceeds the configured limit for a request-line.
+     * The cast is safe, limit_req_line cannot be negative
+     */
+    if (len > (apr_size_t)r->server->limit_req_line) {
+        r->status    = HTTP_REQUEST_URI_TOO_LARGE;
+        r->proto_num = HTTP_VERSION(1,0);
+        r->protocol  = apr_pstrdup(r->pool, "HTTP/1.0");
+        /* Need response here */
+
+        return DONE;
+    }
+
+    if (ll[0]) {
+        r->assbackwards = 0;
+        pro = ll;
+        len = strlen(ll);
+    } else {
+        r->assbackwards = 1;
+        pro = "HTTP/0.9";
+        len = 8;
+    }
+    r->protocol = apr_pstrmemdup(r->pool, pro, len);
+
+    /* Avoid sscanf in the common case */
+    if (len == 8
+        && pro[0] == 'H' && pro[1] == 'T' && pro[2] == 'T' && pro[3] == 'P'
+        && pro[4] == '/' && apr_isdigit(pro[5]) && pro[6] == '.'
+        && apr_isdigit(pro[7])) {
+        r->proto_num = HTTP_VERSION(pro[5] - '0', pro[7] - '0');
+    }
+    else if (2 == sscanf(r->protocol, "HTTP/%u.%u", &major, &minor)
+             && minor < HTTP_VERSION(1, 0)) /* don't allow HTTP/0.1000 */
+        r->proto_num = HTTP_VERSION(major, minor);
+    else
+        r->proto_num = HTTP_VERSION(1, 0);
+
+#if 0
+    if (r->assbackwards && r->header_only) {
+        /*
+         * Client asked for headers only with HTTP/0.9, which doesn't send
+         * headers! Have to dink things just to make sure the error message
+         * comes through...
+         */
+        //                    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+        //                                  "client sent invalid HTTP/0.9
request: HEAD %s",
+        //                                  r->uri);
+        r->header_only = 0;
+        r->status = HTTP_BAD_REQUEST;
+        ap_send_error_response(r, 0);
+        ap_run_log_transaction(r);
+        apr_brigade_destroy(hs->bb);
+        hs->state = HTTP_STATE_ERROR;
+
+    }
+#endif
+    return OK;
+}
+static int ap_process_http_async_connection(conn_rec *c)
+{
+    int rc = OK;
+    request_rec *r;
+    int csd_set = 0;
+    apr_socket_t *csd = NULL;
+    http_state_t *hs = c->hs;
+    http_state_e next_state;
+    const char *expect;
+    int access_status;
+    int len;
+
+    /* Every hook or call that does network i/o must handle and return
+     * APR_EWOULDBLOCK
+     */
+    while (1) {
+        switch (hs->state) {
+        case HTTP_STATE_NEW_CONNECTION:
+        {
+            ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL);
+            r = ap_create_request(c);
+            hs->bb = apr_brigade_create(r->pool,
r->connection->bucket_alloc);
+            hs->headers = apr_table_make(r->pool, 50);
+            hs->r = r;
+            hs->state = HTTP_STATE_READ_REQUEST_LINE;
+        }
+        case HTTP_STATE_READ_REQUEST_LINE:
+        {
+            r = hs->r;
+            r->the_request = NULL;
+            do {
+                rc = ap_rgetline(&(r->the_request),
DEFAULT_LIMIT_REQUEST_LINE + 2,
+                                 &len, r, 0, hs->bb, APR_NONBLOCK_READ);
+                if (rc == APR_EAGAIN) {
+                    return OK;
+                }
+                if (rc != APR_SUCCESS || c->aborted) {
+                    r->request_time = apr_time_now();
+                    hs->state = HTTP_STATE_ERROR;
+                    return OK;
+                }
+            }  while (len <= 0);
+
+            hs->state = HTTP_STATE_PARSE_REQUEST_LINE;
+        }
+        case HTTP_STATE_PARSE_REQUEST_LINE:
+        {
+            rc = ap_parse_request_line(r);
+            if (rc != OK) {
+                hs->state = HTTP_STATE_ERROR;
+                /* todo: Log error message if appropriate */
+                return OK;
+            }
+            hs->state = HTTP_STATE_READ_MIME_HEADERS;
+        }
+        case HTTP_STATE_READ_MIME_HEADERS:
+        {
+            r = hs->r;
+            /* This read will be a blocking read for now */
+            rc = ap_get_mime_headers_core(r, hs);
+            if (rc == APR_EAGAIN) {
+                return OK;
+            }
+            else if (rc != APR_SUCCESS) {
+                hs->state = HTTP_STATE_LINGER;
+                return OK;
+            }

+            apr_brigade_destroy(hs->bb);
+            hs->bb = NULL;
+            r->status = HTTP_OK;                         /* Until further
notice. */
+
+            /* update what we think the virtual host is based on the
headers we've
+             * now read. may update status.
+             */
+            ap_update_vhost_from_headers(r);
+
+            /* we may have switched to another server */
+            r->per_dir_config = r->server->lookup_defaults;
+
+            if ((!r->hostname && (r->proto_num >= HTTP_VERSION(1, 1)))
+                || ((r->proto_num == HTTP_VERSION(1, 1))
+                    && !apr_table_get(r->headers_in, "Host"))) {
+                /*
+                 * Client sent us an HTTP/1.1 or later request without
telling us the
+                 * hostname, either with a full URL or a Host: header. We
therefore
+                 * need to (as per the 1.1 spec) send an error.  As a
special case,
+                 * HTTP/1.1 mentions twice (S9, S14.23) that a request MUST
contain
+                 * a Host: header, and the server MUST respond with 400 if
it doesn't.
+                 */
+                r->status = HTTP_BAD_REQUEST;
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+                              "client sent HTTP/1.1 request without
hostname "
+                              "(see RFC2616 section 14.23): %s", r->uri);
+            }
+
+            if (r->status != HTTP_OK) {
+                ap_send_error_response(r, 0);
+                ap_run_log_transaction(r);
+                hs->state = HTTP_STATE_LINGER;
+                return OK;
+            }
+
+            if (((expect = apr_table_get(r->headers_in, "Expect")) != NULL)
+                && (expect[0] != '\0')) {
+                /*
+                 * The Expect header field was added to HTTP/1.1 after RFC
2068
+                 * as a means to signal when a 100 response is desired and,
+                 * unfortunately, to signal a poor man's mandatory
extension that
+                 * the server must understand or return 417 Expectation
Failed.
+                 */
+                if (strcasecmp(expect, "100-continue") == 0) {
+                    r->expecting_100 = 1;
+                }
+                else {
+                    r->status = HTTP_EXPECTATION_FAILED;
+                    ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
+                                  "client sent an unrecognized expectation
value of "
+                                  "Expect: %s", expect);
+                    ap_send_error_response(r, 0);
+                    ap_run_log_transaction(r);
+                    hs->state = HTTP_STATE_LINGER;
+                    return OK;
+                }
+            }
+
+            ap_add_input_filter_handle(ap_http_input_filter_handle,
+                                       NULL, r, r->connection);
+
+            if ((rc = ap_run_post_read_request(r))) {
+                ap_die(access_status, r);
+                ap_run_log_transaction(r);
+                hs->state = HTTP_STATE_LINGER;
+                return OK;
+            }
+
+            rc = OK;
+            hs->state = HTTP_STATE_WRITE_RESPONSE;
+        }
+        case HTTP_STATE_WRITE_RESPONSE:
+        {
+            r = hs->r;
+            c->keepalive = AP_CONN_UNKNOWN;
+
+            ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
+
+            if (r->status == HTTP_OK)
+                ap_process_request(r);
+
+            if (ap_extended_status)
+                ap_increment_counts(c->sbh, r);
+
+            if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) {
+                hs->state = HTTP_STATE_LINGER;
+                return OK;
+            }
+
+            ap_update_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, r);
+            apr_pool_destroy(r->pool);
+
+            if (ap_graceful_stop_signalled()) {
+                hs->state = HTTP_STATE_LINGER;
+                break;
+            }
+            hs->state = HTTP_STATE_NEW_CONNECTION;
+            continue;
+
+            break;
+        }
+        default:
+            hs->state = HTTP_STATE_LINGER;
+            return OK;
+            break;
+        }
+    }
+    return OK;
+}
 static int http_create_request(request_rec *r)
 {
     if (!r->main && !r->prev) {
@@ -330,7 +603,7 @@

 static void register_hooks(apr_pool_t *p)
 {
-    ap_hook_process_connection(ap_process_http_connection,NULL,NULL,
+    ap_hook_process_connection(ap_process_http_async_connection,NULL,NULL,
 			       APR_HOOK_REALLY_LAST);
     ap_hook_map_to_storage(ap_send_http_trace,NULL,NULL,APR_HOOK_MIDDLE);
     ap_hook_http_method(http_method,NULL,NULL,APR_HOOK_REALLY_LAST);
Index: server/core.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/core.c,v
retrieving revision 1.201
diff -u -r1.201 core.c
--- server/core.c	23 Aug 2002 18:05:37 -0000	1.201
+++ server/core.c	15 Oct 2002 14:54:08 -0000
@@ -3564,10 +3564,11 @@
      * empty).  We do this by returning whatever we have read.  This may
      * or may not be bogus, but is consistent (for now) with EOF logic.
      */
+    /*
     if (APR_STATUS_IS_EAGAIN(rv)) {
         rv = APR_SUCCESS;
     }
-
+    */
     return rv;
 }

Index: server/protocol.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/protocol.c,v
retrieving revision 1.116
diff -u -r1.116 protocol.c
--- server/protocol.c	28 Aug 2002 18:37:48 -0000	1.116
+++ server/protocol.c	15 Oct 2002 14:54:09 -0000
@@ -243,7 +243,8 @@
  */
 AP_DECLARE(apr_status_t) ap_rgetline_core(char **s, apr_size_t n,
                                           apr_size_t *read, request_rec *r,
-                                          int fold, apr_bucket_brigade *bb)
+                                          int fold, apr_bucket_brigade *bb,
+                                          apr_read_type_e block)
 {
     apr_status_t rv;
     apr_bucket *e;
@@ -253,7 +254,7 @@

     apr_brigade_cleanup(bb);
     rv = ap_get_brigade(r->input_filters, bb, AP_MODE_GETLINE,
-                        APR_BLOCK_READ, 0);
+                        block, 0);

     if (rv != APR_SUCCESS) {
         return rv;
@@ -359,7 +360,7 @@

             next_size = n - bytes_handled;

-            rv = ap_rgetline_core(&tmp, next_size, &next_len, r, fold, bb);
+            rv = ap_rgetline_core(&tmp, next_size, &next_len, r, fold, bb,
APR_BLOCK_READ);

             if (rv != APR_SUCCESS) {
                 return rv;
@@ -494,7 +495,7 @@

                 next_size = n - bytes_handled;

-                rv = ap_rgetline_core(&tmp, next_size, &next_len, r, fold,
bb);
+                rv = ap_rgetline_core(&tmp, next_size, &next_len, r, fold,
bb, APR_BLOCK_READ);

                 if (rv != APR_SUCCESS) {
                     return rv;
@@ -531,7 +532,7 @@
 #if APR_CHARSET_EBCDIC
 AP_DECLARE(apr_status_t) ap_rgetline(char **s, apr_size_t n,
                                      apr_size_t *read, request_rec *r,
-                                     int fold, apr_bucket_brigade *bb)
+                                     int fold, apr_bucket_brigade *bb,
apr_read_type_e block)
 {
     /* on ASCII boxes, ap_rgetline is a macro which simply invokes
      * ap_rgetline_core with the same parms
@@ -543,7 +544,7 @@
      */
     apr_status_t rv;

-    rv = ap_rgetline_core(s, n, read, r, fold, bb);
+    rv = ap_rgetline_core(s, n, read, r, fold, bb, block);
     if (rv == APR_SUCCESS) {
         ap_xlate_proto_from_ascii(*s, *read);
     }
@@ -559,7 +560,7 @@
     apr_bucket_brigade *tmp_bb;

     tmp_bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
-    rv = ap_rgetline(&tmp_s, n, &len, r, fold, tmp_bb);
+    rv = ap_rgetline(&tmp_s, n, &len, r, fold, tmp_bb, APR_BLOCK_READ);
     apr_brigade_destroy(tmp_bb);

     /* Map the out-of-space condition to the old API. */
@@ -667,11 +668,11 @@
          */
         r->the_request = NULL;
         rv = ap_rgetline(&(r->the_request), DEFAULT_LIMIT_REQUEST_LINE + 2,
-                         &len, r, 0, bb);
+                         &len, r, 0, bb, APR_NONBLOCK_READ);

         if (rv != APR_SUCCESS) {
             r->request_time = apr_time_now();
-            return 0;
+            return rv;
         }
     } while (len <= 0);

@@ -741,7 +742,7 @@
     return 1;
 }

-AP_DECLARE(void) ap_get_mime_headers_core(request_rec *r,
apr_bucket_brigade *bb)
+AP_DECLARE(apr_status_t) ap_get_mime_headers_core(request_rec *r,
http_state_t *hs)
 {
     char *last_field = NULL;
     apr_size_t last_len = 0;
@@ -750,11 +751,9 @@
     char *value;
     apr_size_t len;
     int fields_read = 0;
-    apr_table_t *tmp_headers;
-
-    /* We'll use apr_table_overlap later to merge these into r->headers_in.
*/
-    tmp_headers = apr_table_make(r->pool, 50);
-
+    apr_bucket_brigade *bb = hs->bb;
+    apr_table_t *tmp_headers = hs->headers;
+
     /*
      * Read header lines until we get the empty separator line, a read
error,
      * the connection closes (EOF), reach the server limit, or we timeout.
@@ -765,7 +764,10 @@

         field = NULL;
         rv = ap_rgetline(&field, DEFAULT_LIMIT_REQUEST_FIELDSIZE + 2,
-                         &len, r, 0, bb);
+                         &len, r, 0, bb, APR_BLOCK_READ);
+
+        if (APR_STATUS_IS_EAGAIN(rv))
+            return rv;

         /* ap_rgetline returns APR_ENOSPC if it fills up the buffer before
          * finding the end-of-line.  This is only going to happen if it
@@ -783,12 +785,12 @@
                                        "<pre>\n",
                                        ap_escape_html(r->pool, field),
                                        "</pre>\n", NULL));
-            return;
+            return rv;
         }

         if (rv != APR_SUCCESS) {
             r->status = HTTP_BAD_REQUEST;
-            return;
+            return rv;
         }

         if (last_field != NULL) {
@@ -821,7 +823,7 @@
                     apr_table_setn(r->notes, "error-notes",
                                    "The number of request header fields "
                                    "exceeds this server's limit.");
-                    return;
+                    return rv;
                 }

                 if (!(value = strchr(last_field, ':'))) { /* Find ':' or
*/
@@ -834,7 +836,7 @@
                                                ap_escape_html(r->pool,
                                                               last_field),
                                                "</pre>\n", NULL));
-                    return;
+                    return rv;
                 }

                 *value = '\0';
@@ -871,6 +873,7 @@
     }

     apr_table_overlap(r->headers_in, tmp_headers,
APR_OVERLAP_TABLES_MERGE);
+    return APR_SUCCESS;
 }

 AP_DECLARE(void) ap_get_mime_headers(request_rec *r)
@@ -881,6 +884,53 @@
     apr_brigade_destroy(tmp_bb);
 }

+request_rec *ap_create_request(conn_rec *conn)
+{
+    request_rec *r;
+    apr_pool_t *p;
+    const char *expect;
+    int access_status;
+
+    apr_pool_create(&p, conn->pool);
+    r = apr_pcalloc(p, sizeof(request_rec));
+
+    conn->r = r;
+
+    r->pool            = p;
+    r->connection      = conn;
+    r->server          = conn->base_server;
+
+    r->user            = NULL;
+    r->ap_auth_type    = NULL;
+
+    r->allowed_methods = ap_make_method_list(p, 2);
+
+    r->headers_in      = apr_table_make(r->pool, 25);
+    r->subprocess_env  = apr_table_make(r->pool, 25);
+    r->headers_out     = apr_table_make(r->pool, 12);
+    r->err_headers_out = apr_table_make(r->pool, 5);
+    r->notes           = apr_table_make(r->pool, 5);
+
+    r->request_config  = ap_create_request_config(r->pool);
+    /* Must be set before we run create request hook */
+
+    r->proto_output_filters = conn->output_filters;
+    r->output_filters  = r->proto_output_filters;
+    r->proto_input_filters = conn->input_filters;
+    r->input_filters   = r->proto_input_filters;
+    ap_run_create_request(r);
+    r->per_dir_config  = r->server->lookup_defaults;
+
+    r->sent_bodyct     = 0;                      /* bytect isn't for body
*/
+
+    r->read_length     = 0;
+    r->read_body       = REQUEST_NO_BODY;
+
+    r->status          = HTTP_REQUEST_TIME_OUT;  /* Until we get a request
*/
+    r->the_request     = NULL;
+    return r;
+}
+
 request_rec *ap_read_request(conn_rec *conn)
 {
     request_rec *r;
@@ -891,6 +941,9 @@

     apr_pool_create(&p, conn->pool);
     r = apr_pcalloc(p, sizeof(request_rec));
+
+    conn->r = r;
+
     r->pool            = p;
     r->connection      = conn;
     r->server          = conn->base_server;
Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.23
diff -u -r1.23 fdqueue.c
--- server/mpm/worker/fdqueue.c	2 Aug 2002 17:37:52 -0000	1.23
+++ server/mpm/worker/fdqueue.c	15 Oct 2002 14:54:10 -0000
@@ -253,7 +253,7 @@
  * the push operation has completed, it signals other threads waiting
  * in ap_queue_pop() that they may continue consuming sockets.
  */
-apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t
*p)
+apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd,
http_state_t *hs, apr_pool_t *p)
 {
     fd_queue_elem_t *elem;
     apr_status_t rv;
@@ -267,6 +267,7 @@

     elem = &queue->data[queue->nelts];
     elem->sd = sd;
+    elem->hs = hs;
     elem->p = p;
     queue->nelts++;

@@ -285,7 +286,7 @@
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t
**p)
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd,
http_state_t **hs, apr_pool_t **p)
 {
     fd_queue_elem_t *elem;
     apr_status_t rv;
@@ -316,6 +317,7 @@

     elem = &queue->data[--queue->nelts];
     *sd = elem->sd;
+    *hs = elem->hs;
     *p = elem->p;
 #ifdef AP_DEBUG
     elem->sd = NULL;
Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.19
diff -u -r1.19 fdqueue.h
--- server/mpm/worker/fdqueue.h	28 Apr 2002 23:12:35 -0000	1.19
+++ server/mpm/worker/fdqueue.h	15 Oct 2002 14:54:10 -0000
@@ -84,6 +84,7 @@
 struct fd_queue_elem_t {
     apr_socket_t      *sd;
     apr_pool_t        *p;
+    http_state_t      *hs;
 };
 typedef struct fd_queue_elem_t fd_queue_elem_t;

@@ -98,8 +99,8 @@
 typedef struct fd_queue_t fd_queue_t;

 apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity,
apr_pool_t *a);
-apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t
*p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t
**p);
+apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd,
http_state_t *hs, apr_pool_t *p);
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd,
http_state_t **hs, apr_pool_t **p);
 apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
 apr_status_t ap_queue_term(fd_queue_t *queue);

Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.133
diff -u -r1.133 worker.c
--- server/mpm/worker/worker.c	28 Aug 2002 18:48:07 -0000	1.133
+++ server/mpm/worker/worker.c	15 Oct 2002 14:54:14 -0000
@@ -111,6 +111,7 @@

 #include <signal.h>
 #include <limits.h>             /* for INT_MAX */
+#include "apr_queue.h"

 /* Limit on the total --- clients will be locked out if more servers than
  * this are needed.  It is intended solely to keep the server from crashing
@@ -175,6 +176,12 @@
 static int resource_shortage = 0;
 static fd_queue_t *worker_queue;
 static fd_queue_info_t *worker_queue_info;
+#if 1
+static apr_socket_t *ev_sock;
+#else
+static apr_queue_t *event_queue;
+#endif
+

 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -605,7 +612,7 @@
  * Child process main loop.
  */

-static void process_socket(apr_pool_t *p, apr_socket_t *sock, int
my_child_num,
+static int process_socket(apr_pool_t *p, apr_socket_t *sock, http_state_t
*hs, int my_child_num,
                            int my_thread_num, apr_bucket_alloc_t
*bucket_alloc)
 {
     conn_rec *current_conn;
@@ -623,14 +630,49 @@
                      "(currently %d)",
                      csd, FD_SETSIZE);
         apr_socket_close(sock);
-        return;
+        return 0;
     }
+    if (hs == NULL) { /* This is equivalent to HTTP_STATE_NEW_CONNECTION */

-    current_conn = ap_run_create_connection(p, ap_server_conf, sock,
-                                            conn_id, sbh, bucket_alloc);
+        current_conn = ap_run_create_connection(p, ap_server_conf, sock,
+                                                conn_id, sbh,
bucket_alloc);
+        hs = apr_pcalloc(p, sizeof(http_state_t));
+        hs->state = HTTP_STATE_NEW_CONNECTION;
+        hs->c = current_conn;
+        hs->p = p;
+        hs->sock = sock;
+        current_conn->hs = hs;
+
+        ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                     "process_connection: hs = %x, hs->p = %x", hs,
hs->p );
+
+    }
+    else {
+        current_conn = hs->c;
+    }
     if (current_conn) {
+        hs = current_conn->hs;
         ap_process_connection(current_conn, sock);
-        ap_lingering_close(current_conn);
+        switch (hs->state) {
+        case HTTP_STATE_READ_REQUEST_LINE:
+        case HTTP_STATE_READ_MIME_HEADERS:
+        {
+            apr_status_t rc;
+            ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                         "Waiting for IO Event: hs = %x, hs->state = %d",
hs, hs->state);
+            do {
+                rc = apr_queue_push(event_queue, hs);
+            } while (rc == APR_EINTR);
+            return 1;
+        }
+        default:
+        {
+            ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                         "Closing connection: hs = %x", hs);
+            ap_lingering_close(current_conn);
+            return 0;
+        }
+        }
     }
 }

@@ -831,7 +873,7 @@
                 signal_threads(ST_GRACEFUL);
             }
             if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans);
+                rv = ap_queue_push(worker_queue, csd, NULL, ptrans);
                 if (rv) {
                     /* trash the connection; we couldn't queue the
connected
                      * socket to a worker
@@ -881,6 +923,7 @@
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
     apr_socket_t *csd = NULL;
+    http_state_t *hs;
     apr_bucket_alloc_t *bucket_alloc;
     apr_pool_t *last_ptrans = NULL;
     apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
@@ -912,7 +955,10 @@
         if (workers_may_exit) {
             break;
         }
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
+        rv = ap_queue_pop(worker_queue, &csd, &hs, &ptrans);
+
+        ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                     "ap_queue_pop: hs = %x, ptrans = %x", hs, ptrans);

         if (rv != APR_SUCCESS) {
             /* We get APR_EOF during a graceful shutdown once all the
connections
@@ -944,11 +990,17 @@
         }
         is_idle = 0;
         worker_sockets[thread_slot] = csd;
-        process_socket(ptrans, csd, process_slot, thread_slot,
bucket_alloc);
+        rv = process_socket(ptrans, csd, hs, process_slot, thread_slot,
bucket_alloc);
+        if (rv) {
+            /* io event pending */
+            last_ptrans = NULL;
+        }
+        else {
+            requests_this_child--; /* FIXME: should be synchronized - aaron
*/
+            apr_pool_clear(ptrans);
+            last_ptrans = ptrans;
+        }
         worker_sockets[thread_slot] = NULL;
-        requests_this_child--; /* FIXME: should be synchronized - aaron */
-        apr_pool_clear(ptrans);
-        last_ptrans = ptrans;
     }

     ap_update_child_status_from_indexes(process_slot, thread_slot,
@@ -969,7 +1021,126 @@
     }
     return 0;
 }
+#if 0
+static void *event_loop(apr_thread_t *thd, void * dummy)
+{
+    apr_pollset_t *pollset;
+    apr_status_t rc;
+    apr_pollfd_t *pollfd;
+    apr_interval_time_t timeout = 15000000;
+    apr_int32_t num;
+    http_state_t *hs;

+    /* Create the pollset */
+    rc =  apr_pollset_create(&pollset, 100, pchild, 0);
+
+    while (1) {
+        void *v;
+        /* Get work */
+        do {
+            rc = apr_queue_pop(event_queue, &v);
+        } while (rc == APR_EINTR);
+        hs = *(http_state_t**)v;
+
+        /* Add work to pollset. These are always read events */
+        apr_poll_setup(&pollfd, 1, hs->p);
+        apr_poll_socket_add(pollfd, hs->sock, APR_POLLIN);
+        pollfd->client_data = (void*) hs;
+        apr_pollset_add(pollset, pollfd);
+        rc = apr_pollset_poll(pollset, timeout, &num, &pollfd);
+
+        while (num) {
+            rc = apr_pollset_remove(pollset, pollfd);
+            hs = (http_state_t*) pollfd->client_data;
+            ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                         "IO Event Received: hs = %x, hs->state = %d", hs,
hs->state);
+            rc = ap_queue_push(worker_queue,
+                               pollfd->desc.s,
+                               hs,
+                               hs->p);
+            pollfd++;
+            num--;
+        }
+        /* Do timeouts */
+    }
+}
+#else
+static void *event_loop(apr_thread_t *thd, void * dummy)
+{
+    apr_pollset_t *pollset;
+    apr_status_t rc;
+    apr_pollfd_t *pollfd;
+    apr_interval_time_t timeout = 15000000;
+    apr_int32_t num;
+    http_state_t *hs;
+
+    /* Create the pollset */
+    rc =  apr_pollset_create(&pollset, 100, pchild, 0);
+
+    /* Add the event_loop pipe to the pollset */
+    apr_poll_setup(&pollfd, 1, pchild);
+    apr_poll_socket_add(pollfd, ev_sock, APR_POLLIN);
+    apr_pollset_add(pollset, pollfd);
+    hs = malloc(sizeof(*hs));
+    hs->sock = ev_pipe;
+    pollfd->client_data = (void*) hs;
+
+    while (1) {
+        do {
+          rc = apr_pollset_poll(pollset, timeout, &num, &pollfd);
+          /* Todo: Timeout old connections */
+        } while (rc == APR_TIMEUP);
+
+        while (num) {
+            hs = (http_state_t*) pollfd->client_data;
+            if (hs->sock == ev_pipe) {
+              /* Read the pointer to hs and reset event */
+              rc = read(evpipe, &hs, 4);
+              apr_poll_setup(&pollfd, 1, hs->p);
+              apr_poll_socket_add(pollfd, hs->sock, APR_POLLIN);
+              pollfd->client_data = (void*) hs;
+              apr_pollset_add(pollset, pollfd);
+            }
+            else {
+              rc = apr_pollset_remove(pollset, pollfd);
+
+              ap_log_error(APLOG_MARK, APLOG_ERR, 0, ap_server_conf,
+                           "IO Event Received: hs = %x, hs->state = %d",
hs, hs->state);
+              rc = ap_queue_push(worker_queue,
+                                 pollfd->desc.s,
+                                 hs,
+                                 hs->p);
+              pollfd++;
+              num--;
+            }
+        }
+    }
+}
+#endif
+/*static void* create_event_thread(apr_thread_t *thd, void* dummy) */
+static void create_event_thread()
+{
+    apr_threadattr_t *thread_attr;
+    apr_thread_t *thread;
+    proc_info *my_info;
+    apr_status_t rv;
+
+    apr_threadattr_create(&thread_attr, pchild);
+    apr_threadattr_detach_set(thread_attr, 0);
+
+    my_info = (proc_info *)malloc(sizeof(proc_info));
+    my_info->pid = -1;
+    my_info->tid = -1;
+    my_info->sd = 0;
+
+    rv = apr_thread_create(&thread, thread_attr, event_loop,
+                           my_info, pchild);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
+                     "apr_thread_create: unable to create event thread");
+        clean_child_exit(APEXIT_CHILDFATAL);
+    }
+}
 static void create_listener_thread(thread_starter *ts)
 {
     int my_child_num = ts->child_num_arg;
@@ -1083,6 +1254,21 @@
             create_listener_thread(ts);
             listener_started = 1;
         }
+#if 1
+        /* Create ev_sock, set to non-blocking */
+
+#else
+        /* Init the event queue/thread */
+        rv = apr_queue_create(&event_queue, 100, pchild);
+        if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
+                         "ap_queue_init() failed");
+            clean_child_exit(APEXIT_CHILDFATAL);
+        }
+#endif
+        create_event_thread();
+        apr_sleep(100);
+
         if (start_thread_may_exit || threads_created ==
ap_threads_per_child) {
             break;
         }


Re: [PATCH] event driven read

Posted by Brian Pane <br...@cnet.com>.
On Tue, 2002-10-15 at 10:31, Bill Stoddard wrote:
> Something I've been hacking on (in the pejorative sense of the word 'hack'.
> Look at the patch and you will see what I mean :-).  This should apply and
> serve pages on Linux, though the event_loop is clearly broken as it does not
> timeout keep-alive connections and will hang on the apr_poll() (and hang the
> server) if a client leaves a keep-alive connection active but does not send
> anything on it. Scoreboard is broken, code structure is poor, yadda yadda. I
> plan to reimplement some of this more cleanly but no idea when I'll get
> around to it. Key points:
> 
> 1. routines to read requests must be able to handl getting APR_EAGAIN (or
> APR_EWOULDBLOCK):
> 2. ap_process_http_connection reimplemented to be state driven
> 3. event loop in worker_mpm to wait for pending i/o

Revisiting this topic from a looong time ago...

Did you ever get a chance to do any further work on this
design?  IMHO, for 2.1/2.2 we should at least incorporate
the changes to allow data to be pushed rather than pulled
through the input filter chain.  That will set the foundation
for an event-driven MPM, even if we're still using a blocking,
thread-per-connection worker design in the first 2.2 release.

Brian



RE: [PATCH] event driven read

Posted by Bill Stoddard <bi...@wstoddard.com>.
>
> >From a quick read through the patch, it looks like the
> connection processing flow is:
>    - listener thread accepts connection
>    - listener passes connection to a worker through fd queue
>    - worker wakes up, passes connection to event thread
>      through event queue
>    - worker goes back to sleep
>    - event thread adds the connection to its fd set
>    - when data is available on the connection, the event
>      thread pushes the connection back onto the fd queue,
>      waking up another worker to handle the connection
>
> Did I get that right?
Yep.

> It seems like a lot of extra
> context switching, compared to just having one event-loop
> thread do all the reads.  On the other hand, having a
> single thread doing all the reads would mean that we
> couldn't spread the work of input processing across
> multiple CPUs (except by adding more processes).
>
> I wonder if we'd get any better results by combining
> the roles of the listener and event threads:
>   - have 'n' listener/reader threads, and let them
>     take turns accepting connections, sort of like the
>     leader/followers design
>   - each listener/reader thread can handle at most
>     'm' connections at once.  As it accepts connections,
>     it adds them to its pollset.
>   - when one of the listener/reader threads finds data
>     available on one of its connections, it does the
>     socket read inline.  (Note: This implies that we
>     can't do anything in an input filter that might
>     take a really long time.)
>   - when a listener/reader thread recognizes a complete
>     request, it hands that connection off to a worker
>     thread for processing.
>   - when the worker has produced a response, it hands
>     off the brigade to a completion thread to wait for
>     the network writes to complete.  (I suppose this
>     completion thread could be combined with the
>     listener/reader thread.  That might actually improve
>     performance on multiprocessors by helping to ensure
>     that each connection's network I/O is handled by the
>     same thread for the life of the request.)
>
> Brian

If I get the time, I will experiment with this approach. I agree that the
current code introduces way too many context switches.

Bill


Re: [PATCH] event driven read

Posted by Brian Pane <br...@cnet.com>.
On Tue, 2002-10-15 at 10:31, Bill Stoddard wrote:
> Something I've been hacking on (in the pejorative sense of the word 'hack'.
> Look at the patch and you will see what I mean :-).  This should apply and
> serve pages on Linux, though the event_loop is clearly broken as it does not
> timeout keep-alive connections and will hang on the apr_poll() (and hang the
> server) if a client leaves a keep-alive connection active but does not send
> anything on it. Scoreboard is broken, code structure is poor, yadda yadda. I
> plan to reimplement some of this more cleanly but no idea when I'll get
> around to it. Key points:
> 
> 1. routines to read requests must be able to handl getting APR_EAGAIN (or
> APR_EWOULDBLOCK):
> 2. ap_process_http_connection reimplemented to be state driven
> 3. event loop in worker_mpm to wait for pending i/o


>From a quick read through the patch, it looks like the
connection processing flow is:
   - listener thread accepts connection
   - listener passes connection to a worker through fd queue
   - worker wakes up, passes connection to event thread
     through event queue
   - worker goes back to sleep
   - event thread adds the connection to its fd set
   - when data is available on the connection, the event
     thread pushes the connection back onto the fd queue,
     waking up another worker to handle the connection

Did I get that right?  It seems like a lot of extra
context switching, compared to just having one event-loop
thread do all the reads.  On the other hand, having a
single thread doing all the reads would mean that we
couldn't spread the work of input processing across
multiple CPUs (except by adding more processes).

I wonder if we'd get any better results by combining
the roles of the listener and event threads:
  - have 'n' listener/reader threads, and let them
    take turns accepting connections, sort of like the
    leader/followers design
  - each listener/reader thread can handle at most
    'm' connections at once.  As it accepts connections,
    it adds them to its pollset.
  - when one of the listener/reader threads finds data
    available on one of its connections, it does the
    socket read inline.  (Note: This implies that we
    can't do anything in an input filter that might
    take a really long time.)
  - when a listener/reader thread recognizes a complete
    request, it hands that connection off to a worker
    thread for processing.
  - when the worker has produced a response, it hands
    off the brigade to a completion thread to wait for
    the network writes to complete.  (I suppose this
    completion thread could be combined with the
    listener/reader thread.  That might actually improve
    performance on multiprocessors by helping to ensure
    that each connection's network I/O is handled by the
    same thread for the life of the request.)

Brian