You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by yl...@apache.org on 2020/07/02 00:14:27 UTC

svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Author: ylavic
Date: Thu Jul  2 00:14:26 2020
New Revision: 1879419

URL: http://svn.apache.org/viewvc?rev=1879419&view=rev
Log:
mod_proxy_http: handle async tunneling of Upgrade(d) protocols.

When supported by the MPM (i.e. "event"), provide async callbacks and let
them be scheduled by ap_mpm_register_poll_callback_timeout(), while the
handler returns SUSPENDED.

The new ProxyAsyncDelay directive (if positive) enables async handling,
while ProxyAsyncIdleTimeout determines the timeout applied on both ends
while tunneling.

Github: closes #126


Modified:
    httpd/httpd/trunk/modules/proxy/mod_proxy.c
    httpd/httpd/trunk/modules/proxy/mod_proxy.h
    httpd/httpd/trunk/modules/proxy/mod_proxy_http.c

Modified: httpd/httpd/trunk/modules/proxy/mod_proxy.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy.c?rev=1879419&r1=1879418&r2=1879419&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/proxy/mod_proxy.c (original)
+++ httpd/httpd/trunk/modules/proxy/mod_proxy.c Thu Jul  2 00:14:26 2020
@@ -1842,6 +1842,7 @@ static void *create_proxy_dir_config(apr
     new->add_forwarded_headers_set = 0;
     new->forward_100_continue = 1;
     new->forward_100_continue_set = 0;
+    new->async_delay = -1;
 
     return (void *) new;
 }
@@ -1889,17 +1890,30 @@ static void *merge_proxy_dir_config(apr_
     new->error_override_set = add->error_override_set || base->error_override_set;
     new->alias = (add->alias_set == 0) ? base->alias : add->alias;
     new->alias_set = add->alias_set || base->alias_set;
+
     new->add_forwarded_headers =
         (add->add_forwarded_headers_set == 0) ? base->add_forwarded_headers
         : add->add_forwarded_headers;
     new->add_forwarded_headers_set = add->add_forwarded_headers_set
         || base->add_forwarded_headers_set;
+
     new->forward_100_continue =
         (add->forward_100_continue_set == 0) ? base->forward_100_continue
                                              : add->forward_100_continue;
     new->forward_100_continue_set = add->forward_100_continue_set
                                     || base->forward_100_continue_set;
 
+    new->async_delay =
+        (add->async_delay_set == 0) ? base->async_delay
+                                    : add->async_delay;
+    new->async_delay_set = add->async_delay_set
+                           || base->async_delay_set;
+    new->async_idle_timeout =
+        (add->async_idle_timeout_set == 0) ? base->async_idle_timeout
+                                           : add->async_idle_timeout;
+    new->async_idle_timeout_set = add->async_idle_timeout_set
+                                  || base->async_idle_timeout_set;
+
     return new;
 }
 
@@ -2480,6 +2494,33 @@ static const char *
 }
 
 static const char *
+    set_proxy_async_delay(cmd_parms *parms, void *dconf, const char *arg)
+{
+    proxy_dir_conf *conf = dconf;
+    if (strcmp(arg, "-1") == 0) {
+        conf->async_delay = -1;
+    }
+    else if (ap_timeout_parameter_parse(arg, &conf->async_delay, "s")
+                || conf->async_delay < 0) {
+        return "ProxyAsyncDelay has wrong format";
+    }
+    conf->async_delay_set = 1;
+    return NULL;
+}
+
+static const char *
+    set_proxy_async_idle(cmd_parms *parms, void *dconf, const char *arg)
+{
+    proxy_dir_conf *conf = dconf;
+    if (ap_timeout_parameter_parse(arg, &conf->async_idle_timeout, "s")
+            || conf->async_idle_timeout < 0) {
+        return "ProxyAsyncIdleTimeout has wrong format";
+    }
+    conf->async_idle_timeout_set = 1;
+    return NULL;
+}
+
+static const char *
     set_recv_buffer_size(cmd_parms *parms, void *dummy, const char *arg)
 {
     proxy_server_conf *psf =
@@ -3068,6 +3109,10 @@ static const command_rec proxy_cmds[] =
     AP_INIT_FLAG("Proxy100Continue", forward_100_continue, NULL, RSRC_CONF|ACCESS_CONF,
      "on if 100-Continue should be forwarded to the origin server, off if the "
      "proxy should handle it by itself"),
+    AP_INIT_TAKE1("ProxyAsyncDelay", set_proxy_async_delay, NULL, RSRC_CONF|ACCESS_CONF,
+     "Amount of time to poll before going asynchronous"),
+    AP_INIT_TAKE1("ProxyAsyncIdleTimeout", set_proxy_async_idle, NULL, RSRC_CONF|ACCESS_CONF,
+     "Timeout for asynchronous inactivity, ProxyTimeout by default"),
     {NULL}
 };
 

Modified: httpd/httpd/trunk/modules/proxy/mod_proxy.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy.h?rev=1879419&r1=1879418&r2=1879419&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/proxy/mod_proxy.h (original)
+++ httpd/httpd/trunk/modules/proxy/mod_proxy.h Thu Jul  2 00:14:26 2020
@@ -247,6 +247,11 @@ typedef struct {
     unsigned int forward_100_continue_set:1;
 
     apr_array_header_t *error_override_codes;
+
+    apr_interval_time_t async_delay;
+    apr_interval_time_t async_idle_timeout;
+    unsigned int async_delay_set:1;
+    unsigned int async_idle_timeout_set:1;
 } proxy_dir_conf;
 
 /* if we interpolate env vars per-request, we'll need a per-request

Modified: httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy_http.c?rev=1879419&r1=1879418&r2=1879419&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/proxy/mod_proxy_http.c (original)
+++ httpd/httpd/trunk/modules/proxy/mod_proxy_http.c Thu Jul  2 00:14:26 2020
@@ -18,19 +18,17 @@
 
 #include "mod_proxy.h"
 #include "ap_regex.h"
+#include "ap_mpm.h"
 
 module AP_MODULE_DECLARE_DATA proxy_http_module;
 
 static int (*ap_proxy_clear_connection_fn)(request_rec *r, apr_table_t *headers) =
         NULL;
 
-static apr_status_t ap_proxy_http_cleanup(const char *scheme,
-                                          request_rec *r,
-                                          proxy_conn_rec *backend);
-
 static apr_status_t ap_proxygetline(apr_bucket_brigade *bb, char *s, int n,
                                     request_rec *r, int flags, int *read);
 
+
 /*
  * Canonicalise http-like URLs.
  *  scheme is the scheme for the URL
@@ -220,6 +218,12 @@ static void add_cl(apr_pool_t *p,
 #define MAX_MEM_SPOOL 16384
 
 typedef enum {
+    PROXY_HTTP_REQ_HAVE_HEADER = 0,
+
+    PROXY_HTTP_TUNNELING
+} proxy_http_state;
+
+typedef enum {
     RB_INIT = 0,
     RB_STREAM_CL,
     RB_STREAM_CHUNKED,
@@ -229,29 +233,129 @@ typedef enum {
 typedef struct {
     apr_pool_t *p;
     request_rec *r;
+    const char *proto;
     proxy_worker *worker;
+    proxy_dir_conf *dconf;
     proxy_server_conf *sconf;
-
     char server_portstr[32];
+
     proxy_conn_rec *backend;
     conn_rec *origin;
 
     apr_bucket_alloc_t *bucket_alloc;
     apr_bucket_brigade *header_brigade;
     apr_bucket_brigade *input_brigade;
+
     char *old_cl_val, *old_te_val;
     apr_off_t cl_val;
 
+    proxy_http_state state;
     rb_methods rb_method;
 
-    int force10;
     const char *upgrade;
-
-    int expecting_100;
-    unsigned int do_100_continue:1,
-                 prefetch_nonblocking:1;
+    proxy_tunnel_rec *tunnel;
+    apr_array_header_t *pfds;
+    apr_interval_time_t idle_timeout;
+
+    unsigned int can_go_async           :1,
+                 expecting_100          :1,
+                 do_100_continue        :1,
+                 prefetch_nonblocking   :1,
+                 force10                :1;
 } proxy_http_req_t;
 
+static void proxy_http_async_finish(proxy_http_req_t *req)
+{ 
+    conn_rec *c = req->r->connection;
+
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
+                  "proxy %s: finish async", req->proto);
+
+    proxy_run_detach_backend(req->r, req->backend);
+    ap_proxy_release_connection(req->proto, req->backend, req->r->server);
+
+    ap_finalize_request_protocol(req->r);
+    ap_process_request_after_handler(req->r);
+    /* don't touch req or req->r from here */
+
+    c->cs->state = CONN_STATE_LINGER;
+    ap_mpm_resume_suspended(c);
+}
+
+/* If neither socket becomes readable in the specified timeout,
+ * this callback will kill the request.
+ * We do not have to worry about having a cancel and a IO both queued.
+ */
+static void proxy_http_async_cancel_cb(void *baton)
+{ 
+    proxy_http_req_t *req = (proxy_http_req_t *)baton;
+
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
+                  "proxy %s: cancel async", req->proto);
+
+    req->r->connection->keepalive = AP_CONN_CLOSE;
+    req->backend->close = 1;
+    proxy_http_async_finish(req);
+}
+
+/* Invoked by the event loop when data is ready on either end. 
+ * We don't need the invoke_mtx, since we never put multiple callback events
+ * in the queue.
+ */
+static void proxy_http_async_cb(void *baton)
+{ 
+    proxy_http_req_t *req = (proxy_http_req_t *)baton;
+    int status;
+
+    if (req->pfds) {
+        apr_pool_clear(req->pfds->pool);
+    }
+
+    switch (req->state) {
+    case PROXY_HTTP_TUNNELING:
+        /* Pump both ends until they'd block and then start over again */
+        status = ap_proxy_tunnel_run(req->tunnel);
+        if (status == HTTP_GATEWAY_TIME_OUT) {
+            if (req->pfds) {
+                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
+                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
+                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
+                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
+            }
+            else {
+                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
+                apr_pool_create(&req->pfds->pool, req->p);
+            }
+            status = SUSPENDED;
+        }
+        break;
+
+    default:
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
+                      "proxy %s: unexpected async state (%i)",
+                      req->proto, (int)req->state);
+        status = HTTP_INTERNAL_SERVER_ERROR;
+        break;
+    }
+
+    if (status == SUSPENDED) {
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
+                      "proxy %s: suspended, going async",
+                      req->proto);
+
+        ap_mpm_register_poll_callback_timeout(req->pfds,
+                                              proxy_http_async_cb, 
+                                              proxy_http_async_cancel_cb, 
+                                              req, req->idle_timeout);
+    }
+    else if (status != OK) {
+        proxy_http_async_cancel_cb(req);
+    }
+    else {
+        proxy_http_async_finish(req);
+    }
+}
+
 /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
  * pass a FLUSH bucket to the backend and read again in blocking mode.
  */
@@ -1200,13 +1304,11 @@ int ap_proxy_http_process_response(proxy
     int i;
     const char *te = NULL;
     int original_status = r->status;
-    int proxy_status = OK;
     const char *original_status_line = r->status_line;
     const char *proxy_status_line = NULL;
     apr_interval_time_t old_timeout = 0;
-    proxy_dir_conf *dconf;
-
-    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+    proxy_dir_conf *dconf = req->dconf;
+    int proxy_status = OK;
 
     bb = apr_brigade_create(p, c->bucket_alloc);
     pass_bb = apr_brigade_create(p, c->bucket_alloc);
@@ -1634,9 +1736,6 @@ int ap_proxy_http_process_response(proxy
 
         if (proxy_status == HTTP_SWITCHING_PROTOCOLS) {
             apr_status_t rv;
-            proxy_tunnel_rec *tunnel;
-            apr_interval_time_t client_timeout = -1,
-                                backend_timeout = -1;
 
             /* If we didn't send the full body yet, do it now */
             if (do_100_continue) {
@@ -1650,41 +1749,35 @@ int ap_proxy_http_process_response(proxy
             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(10239)
                           "HTTP: tunneling protocol %s", upgrade);
 
-            rv = ap_proxy_tunnel_create(&tunnel, r, origin, "HTTP");
+            rv = ap_proxy_tunnel_create(&req->tunnel, r, origin, upgrade);
             if (rv != APR_SUCCESS) {
                 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10240)
                               "can't create tunnel for %s", upgrade);
                 return HTTP_INTERNAL_SERVER_ERROR;
             }
 
-            /* Set timeout to the lowest configured for client or backend */
-            apr_socket_timeout_get(backend->sock, &backend_timeout);
-            apr_socket_timeout_get(ap_get_conn_socket(c), &client_timeout);
-            if (backend_timeout >= 0 && backend_timeout < client_timeout) {
-                tunnel->timeout = backend_timeout;
-            }
-            else {
-                tunnel->timeout = client_timeout;
-            }
+            req->proto = upgrade;
 
-            /* Let proxy tunnel forward everything */
-            status = ap_proxy_tunnel_run(tunnel);
-            if (ap_is_HTTP_ERROR(status)) {
-                /* Tunnel always return HTTP_GATEWAY_TIME_OUT on timeout,
-                 * but we can differentiate between client and backend here.
-                 */
-                if (status == HTTP_GATEWAY_TIME_OUT
-                        && tunnel->timeout == client_timeout) {
-                    status = HTTP_REQUEST_TIME_OUT;
-                }
+            if (req->can_go_async) {
+                /* Let the MPM schedule the work when idle */
+                req->state = PROXY_HTTP_TUNNELING;
+                req->tunnel->timeout = dconf->async_delay;
+                proxy_http_async_cb(req);
+                return SUSPENDED;
             }
-            else {
+
+            /* Let proxy tunnel forward everything within this thread */
+            req->tunnel->timeout = req->idle_timeout;
+            status = ap_proxy_tunnel_run(req->tunnel);
+            if (!ap_is_HTTP_ERROR(status)) {
                 /* Update r->status for custom log */
                 status = HTTP_SWITCHING_PROTOCOLS;
             }
             r->status = status;
 
             /* We are done with both connections */
+            r->connection->keepalive = AP_CONN_CLOSE;
+            backend->close = 1;
             return DONE;
         }
 
@@ -2000,14 +2093,6 @@ int ap_proxy_http_process_response(proxy
     return OK;
 }
 
-static
-apr_status_t ap_proxy_http_cleanup(const char *scheme, request_rec *r,
-                                   proxy_conn_rec *backend)
-{
-    ap_proxy_release_connection(scheme, backend, r->server);
-    return OK;
-}
-
 /*
  * This handles http:// URLs, and other URLs using a remote proxy over http
  * If proxyhost is NULL, then contact the server directly, otherwise
@@ -2029,6 +2114,7 @@ static int proxy_http_handler(request_re
     proxy_http_req_t *req = NULL;
     proxy_conn_rec *backend = NULL;
     apr_bucket_brigade *input_brigade = NULL;
+    int mpm_can_poll = 0;
     int is_ssl = 0;
     conn_rec *c = r->connection;
     proxy_dir_conf *dconf;
@@ -2080,20 +2166,26 @@ static int proxy_http_handler(request_re
                                               worker, r->server)) != OK) {
         return status;
     }
-
     backend->is_ssl = is_ssl;
 
+    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
+    ap_mpm_query(AP_MPMQ_CAN_POLL, &mpm_can_poll);
+
     req = apr_pcalloc(p, sizeof(*req));
     req->p = p;
     req->r = r;
     req->sconf = conf;
+    req->dconf = dconf;
     req->worker = worker;
     req->backend = backend;
+    req->proto = proxy_function;
     req->bucket_alloc = c->bucket_alloc;
+    req->can_go_async = (mpm_can_poll &&
+                         dconf->async_delay_set &&
+                         dconf->async_delay >= 0);
+    req->state = PROXY_HTTP_REQ_HAVE_HEADER;
     req->rb_method = RB_INIT;
 
-    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
-
     if (apr_table_get(r->subprocess_env, "force-proxy-request-1.0")) {
         req->force10 = 1;
     }
@@ -2105,6 +2197,22 @@ static int proxy_http_handler(request_re
         }
     }
 
+    if (req->can_go_async || req->upgrade) {
+        /* If ProxyAsyncIdleTimeout is not set, use backend timeout */
+        if (req->can_go_async && dconf->async_idle_timeout_set) {
+            req->idle_timeout = dconf->async_idle_timeout;
+        }
+        else if (worker->s->timeout_set) {
+            req->idle_timeout = worker->s->timeout;
+        }
+        else if (conf->timeout_set) {
+            req->idle_timeout = conf->timeout;
+        }
+        else {
+            req->idle_timeout = r->server->timeout;
+        }
+    }
+
     /* We possibly reuse input data prefetched in previous call(s), e.g. for a
      * balancer fallback scenario, and in this case the 100 continue settings
      * should be consistent between balancer members. If not, we need to ignore
@@ -2128,15 +2236,19 @@ static int proxy_http_handler(request_re
          * req->expecting_100 (i.e. cleared only if mod_proxy_http sent the
          * "100 Continue" according to its policy).
          */
-        req->do_100_continue = req->prefetch_nonblocking = 1;
-        req->expecting_100 = r->expecting_100;
+        req->do_100_continue = 1;
+        req->expecting_100 = (r->expecting_100 != 0);
         r->expecting_100 = 0;
     }
+
     /* Should we block while prefetching the body or try nonblocking and flush
      * data to the backend ASAP?
      */
-    else if (input_brigade || apr_table_get(r->subprocess_env,
-                                            "proxy-prefetch-nonblocking")) {
+    if (input_brigade
+             || req->can_go_async
+             || req->do_100_continue
+             || apr_table_get(r->subprocess_env,
+                              "proxy-prefetch-nonblocking")) {
         req->prefetch_nonblocking = 1;
     }
 
@@ -2255,6 +2367,9 @@ static int proxy_http_handler(request_re
 
         /* Step Five: Receive the Response... Fall thru to cleanup */
         status = ap_proxy_http_process_response(req);
+        if (status == SUSPENDED) {
+            return SUSPENDED;
+        }
         if (req->backend) {
             proxy_run_detach_backend(r, req->backend);
         }
@@ -2267,7 +2382,8 @@ cleanup:
     if (req->backend) {
         if (status != OK)
             req->backend->close = 1;
-        ap_proxy_http_cleanup(proxy_function, r, req->backend);
+        ap_proxy_release_connection(proxy_function, req->backend,
+                                    r->server);
     }
     if (req->expecting_100) {
         /* Restore r->expecting_100 if we didn't touch it */



Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Yann Ylavic <yl...@gmail.com>.
On Thu, Jul 2, 2020 at 2:04 PM Ruediger Pluem <rp...@apache.org> wrote:
>
> On 7/2/20 12:42 PM, Eric Covener wrote:
> > On Thu, Jul 2, 2020 at 6:25 AM Ruediger Pluem <rp...@apache.org> wrote:
> >>
> >> On 7/2/20 11:17 AM, Yann Ylavic wrote:
> >>> On Thu, Jul 2, 2020 at 10:17 AM Ruediger Pluem <rp...@apache.org> wrote:
> >>>>
> >>>> On 7/2/20 2:14 AM, ylavic@apache.org wrote:
> >>>>>
>
> >>> Since event_register_poll_callback_ex() uses pfds->pool for mpm_event
> >>> "internal" allocations, creating and clearing subpool req->pfds->pool
> >>> avoids leaks when proxy_http_async_cb() (and thus
> >>> ap_mpm_register_poll_callback_timeout() below) is called multiple
> >>> times, each time connections are idle and need rescheduling through
> >>> the MPM.
> >>
> >> I understand why you do this, but I think it is a dangerous approach. If the array would do resize operations it would allocate
> >> from a different pool then. I think it is a design flaw of event_register_poll_callback_ex to allocate stuff from pfds->pool.
> >> It should have a separate pool argument and should use this pool for these allocations. If callers want to have the same
> >> lifetime of these objects as the array they could simply supply pfds->pool. I would be even fine if
> >> event_register_poll_callback_ex would accept NULL for this parameter and use pfds->pool in this case.
> >
> > Since these API's never made it out of trunk, I think we can break/fix
> > them without too much worry, especially if it simplifies callers.
>
> +1.

OK, I was going to ask for it, adding (yet) another poll_callback hook
looked overkill to me :)

I just committed the clarification/comments about the dedicated pfds
and subpool used in both proxy_http and proxy_wstunnel (resp. r1879437
and r1879438), going to change to an explicit pool arg now.

Thanks for the review!

Regards;
Yann.



>
> Regards
>
> Rüdiger
>

Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Ruediger Pluem <rp...@apache.org>.

On 7/2/20 12:42 PM, Eric Covener wrote:
> On Thu, Jul 2, 2020 at 6:25 AM Ruediger Pluem <rp...@apache.org> wrote:
>>
>>
>>
>> On 7/2/20 11:17 AM, Yann Ylavic wrote:
>>> On Thu, Jul 2, 2020 at 10:17 AM Ruediger Pluem <rp...@apache.org> wrote:
>>>>
>>>> On 7/2/20 2:14 AM, ylavic@apache.org wrote:
>>>>>

>>> Since event_register_poll_callback_ex() uses pfds->pool for mpm_event
>>> "internal" allocations, creating and clearing subpool req->pfds->pool
>>> avoids leaks when proxy_http_async_cb() (and thus
>>> ap_mpm_register_poll_callback_timeout() below) is called multiple
>>> times, each time connections are idle and need rescheduling through
>>> the MPM.
>>
>> I understand why you do this, but I think it is a dangerous approach. If the array would do resize operations it would allocate
>> from a different pool then. I think it is a design flaw of event_register_poll_callback_ex to allocate stuff from pfds->pool.
>> It should have a separate pool argument and should use this pool for these allocations. If callers want to have the same
>> lifetime of these objects as the array they could simply supply pfds->pool. I would be even fine if
>> event_register_poll_callback_ex would accept NULL for this parameter and use pfds->pool in this case.
> 
> Since these API's never made it out of trunk, I think we can break/fix
> them without too much worry, especially if it simplifies callers.

+1.

Regards

Rüdiger


Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Eric Covener <co...@gmail.com>.
On Thu, Jul 2, 2020 at 6:25 AM Ruediger Pluem <rp...@apache.org> wrote:
>
>
>
> On 7/2/20 11:17 AM, Yann Ylavic wrote:
> > On Thu, Jul 2, 2020 at 10:17 AM Ruediger Pluem <rp...@apache.org> wrote:
> >>
> >> On 7/2/20 2:14 AM, ylavic@apache.org wrote:
> >>>
> >>> +/* Invoked by the event loop when data is ready on either end.
> >>> + * We don't need the invoke_mtx, since we never put multiple callback events
> >>> + * in the queue.
> >>> + */
> >>> +static void proxy_http_async_cb(void *baton)
> >>> +{
> >>> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> >>> +    int status;
> >>> +
> >>> +    if (req->pfds) {
> >>> +        apr_pool_clear(req->pfds->pool);
> >>> +    }
> >>> +
> >>> +    switch (req->state) {
> >>> +    case PROXY_HTTP_TUNNELING:
> >>> +        /* Pump both ends until they'd block and then start over again */
> >>> +        status = ap_proxy_tunnel_run(req->tunnel);
> >>> +        if (status == HTTP_GATEWAY_TIME_OUT) {
> >>> +            if (req->pfds) {
> >>> +                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> >>> +                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> >>> +                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> >>> +                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
> >>
> >> What is the purpose of this?
> >> async_pfds and tunnel_pfds  are local to this block and cannot be used outside this block.
> >
> > Here and in mod_proxy_wstunnel, the goal is that we have a dedicated
> > pfds array for ap_mpm_register_poll_callback_timeout() which modifies
> > the array in-place and uses pfds->pool for its own allocations (see
> > event_register_poll_callback_ex() in MPM event).
> >
> > async_pfds and tunnel_pfds are local but point to req->pfds and
> > req->tunnel->pfd, this is just to avoid ugly one-line casting.
> > I could use the APR_ARRAY_IDX() macro though..
>
> Thanks for explaining. I guess a comment about this in the code could prevent confusion for future readers :-)
>
> >
> >>
> >>> +            }
> >>> +            else {
> >>> +                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> >>> +                apr_pool_create(&req->pfds->pool, req->p);
> >>
> >> Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?
> >
> > Since event_register_poll_callback_ex() uses pfds->pool for mpm_event
> > "internal" allocations, creating and clearing subpool req->pfds->pool
> > avoids leaks when proxy_http_async_cb() (and thus
> > ap_mpm_register_poll_callback_timeout() below) is called multiple
> > times, each time connections are idle and need rescheduling through
> > the MPM.
>
> I understand why you do this, but I think it is a dangerous approach. If the array would do resize operations it would allocate
> from a different pool then. I think it is a design flaw of event_register_poll_callback_ex to allocate stuff from pfds->pool.
> It should have a separate pool argument and should use this pool for these allocations. If callers want to have the same
> lifetime of these objects as the array they could simply supply pfds->pool. I would be even fine if
> event_register_poll_callback_ex would accept NULL for this parameter and use pfds->pool in this case.

Since these API's never made it out of trunk, I think we can break/fix
them without too much worry, especially if it simplifies callers.
(sorry Yann for leaving it subtly hosed this way)

Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Ruediger Pluem <rp...@apache.org>.

On 7/2/20 11:17 AM, Yann Ylavic wrote:
> On Thu, Jul 2, 2020 at 10:17 AM Ruediger Pluem <rp...@apache.org> wrote:
>>
>> On 7/2/20 2:14 AM, ylavic@apache.org wrote:
>>>
>>> +/* Invoked by the event loop when data is ready on either end.
>>> + * We don't need the invoke_mtx, since we never put multiple callback events
>>> + * in the queue.
>>> + */
>>> +static void proxy_http_async_cb(void *baton)
>>> +{
>>> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
>>> +    int status;
>>> +
>>> +    if (req->pfds) {
>>> +        apr_pool_clear(req->pfds->pool);
>>> +    }
>>> +
>>> +    switch (req->state) {
>>> +    case PROXY_HTTP_TUNNELING:
>>> +        /* Pump both ends until they'd block and then start over again */
>>> +        status = ap_proxy_tunnel_run(req->tunnel);
>>> +        if (status == HTTP_GATEWAY_TIME_OUT) {
>>> +            if (req->pfds) {
>>> +                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
>>> +                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
>>> +                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
>>> +                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
>>
>> What is the purpose of this?
>> async_pfds and tunnel_pfds  are local to this block and cannot be used outside this block.
> 
> Here and in mod_proxy_wstunnel, the goal is that we have a dedicated
> pfds array for ap_mpm_register_poll_callback_timeout() which modifies
> the array in-place and uses pfds->pool for its own allocations (see
> event_register_poll_callback_ex() in MPM event).
> 
> async_pfds and tunnel_pfds are local but point to req->pfds and
> req->tunnel->pfd, this is just to avoid ugly one-line casting.
> I could use the APR_ARRAY_IDX() macro though..

Thanks for explaining. I guess a comment about this in the code could prevent confusion for future readers :-)

> 
>>
>>> +            }
>>> +            else {
>>> +                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
>>> +                apr_pool_create(&req->pfds->pool, req->p);
>>
>> Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?
> 
> Since event_register_poll_callback_ex() uses pfds->pool for mpm_event
> "internal" allocations, creating and clearing subpool req->pfds->pool
> avoids leaks when proxy_http_async_cb() (and thus
> ap_mpm_register_poll_callback_timeout() below) is called multiple
> times, each time connections are idle and need rescheduling through
> the MPM.

I understand why you do this, but I think it is a dangerous approach. If the array would do resize operations it would allocate
from a different pool then. I think it is a design flaw of event_register_poll_callback_ex to allocate stuff from pfds->pool.
It should have a separate pool argument and should use this pool for these allocations. If callers want to have the same
lifetime of these objects as the array they could simply supply pfds->pool. I would be even fine if
event_register_poll_callback_ex would accept NULL for this parameter and use pfds->pool in this case.

Regards

Rüdiger


Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Yann Ylavic <yl...@gmail.com>.
On Thu, Jul 2, 2020 at 10:17 AM Ruediger Pluem <rp...@apache.org> wrote:
>
> On 7/2/20 2:14 AM, ylavic@apache.org wrote:
> >
> > +/* Invoked by the event loop when data is ready on either end.
> > + * We don't need the invoke_mtx, since we never put multiple callback events
> > + * in the queue.
> > + */
> > +static void proxy_http_async_cb(void *baton)
> > +{
> > +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> > +    int status;
> > +
> > +    if (req->pfds) {
> > +        apr_pool_clear(req->pfds->pool);
> > +    }
> > +
> > +    switch (req->state) {
> > +    case PROXY_HTTP_TUNNELING:
> > +        /* Pump both ends until they'd block and then start over again */
> > +        status = ap_proxy_tunnel_run(req->tunnel);
> > +        if (status == HTTP_GATEWAY_TIME_OUT) {
> > +            if (req->pfds) {
> > +                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> > +                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> > +                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> > +                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
>
> What is the purpose of this?
> async_pfds and tunnel_pfds  are local to this block and cannot be used outside this block.

Here and in mod_proxy_wstunnel, the goal is that we have a dedicated
pfds array for ap_mpm_register_poll_callback_timeout() which modifies
the array in-place and uses pfds->pool for its own allocations (see
event_register_poll_callback_ex() in MPM event).

async_pfds and tunnel_pfds are local but point to req->pfds and
req->tunnel->pfd, this is just to avoid ugly one-line casting.
I could use the APR_ARRAY_IDX() macro though..

>
> > +            }
> > +            else {
> > +                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> > +                apr_pool_create(&req->pfds->pool, req->p);
>
> Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?

Since event_register_poll_callback_ex() uses pfds->pool for mpm_event
"internal" allocations, creating and clearing subpool req->pfds->pool
avoids leaks when proxy_http_async_cb() (and thus
ap_mpm_register_poll_callback_timeout() below) is called multiple
times, each time connections are idle and need rescheduling through
the MPM.

>
> > +            }
> > +            status = SUSPENDED;
> > +        }
> > +        break;
> > +
> > +    default:
> > +        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
> > +                      "proxy %s: unexpected async state (%i)",
> > +                      req->proto, (int)req->state);
> > +        status = HTTP_INTERNAL_SERVER_ERROR;
> > +        break;
> > +    }
> > +
> > +    if (status == SUSPENDED) {
> > +        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> > +                      "proxy %s: suspended, going async",
> > +                      req->proto);
> > +
> > +        ap_mpm_register_poll_callback_timeout(req->pfds,
> > +                                              proxy_http_async_cb,
> > +                                              proxy_http_async_cancel_cb,
> > +                                              req, req->idle_timeout);
> > +    }
> > +    else if (status != OK) {
> > +        proxy_http_async_cancel_cb(req);
> > +    }
> > +    else {
> > +        proxy_http_async_finish(req);
> > +    }
> > +}


Regards;
Yann.

Re: svn commit: r1879419 - in /httpd/httpd/trunk/modules/proxy: mod_proxy.c mod_proxy.h mod_proxy_http.c

Posted by Ruediger Pluem <rp...@apache.org>.

On 7/2/20 2:14 AM, ylavic@apache.org wrote:
> Author: ylavic
> Date: Thu Jul  2 00:14:26 2020
> New Revision: 1879419
> 
> URL: http://svn.apache.org/viewvc?rev=1879419&view=rev
> Log:
> mod_proxy_http: handle async tunneling of Upgrade(d) protocols.
> 
> When supported by the MPM (i.e. "event"), provide async callbacks and let
> them be scheduled by ap_mpm_register_poll_callback_timeout(), while the
> handler returns SUSPENDED.
> 
> The new ProxyAsyncDelay directive (if positive) enables async handling,
> while ProxyAsyncIdleTimeout determines the timeout applied on both ends
> while tunneling.
> 
> Github: closes #126
> 
> 
> Modified:
>     httpd/httpd/trunk/modules/proxy/mod_proxy.c
>     httpd/httpd/trunk/modules/proxy/mod_proxy.h
>     httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> 

> Modified: httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy_http.c?rev=1879419&r1=1879418&r2=1879419&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/proxy/mod_proxy_http.c (original)
> +++ httpd/httpd/trunk/modules/proxy/mod_proxy_http.c Thu Jul  2 00:14:26 2020

> @@ -229,29 +233,129 @@ typedef enum {
>  typedef struct {
>      apr_pool_t *p;
>      request_rec *r;
> +    const char *proto;
>      proxy_worker *worker;
> +    proxy_dir_conf *dconf;
>      proxy_server_conf *sconf;
> -
>      char server_portstr[32];
> +
>      proxy_conn_rec *backend;
>      conn_rec *origin;
>  
>      apr_bucket_alloc_t *bucket_alloc;
>      apr_bucket_brigade *header_brigade;
>      apr_bucket_brigade *input_brigade;
> +
>      char *old_cl_val, *old_te_val;
>      apr_off_t cl_val;
>  
> +    proxy_http_state state;
>      rb_methods rb_method;
>  
> -    int force10;
>      const char *upgrade;
> -
> -    int expecting_100;
> -    unsigned int do_100_continue:1,
> -                 prefetch_nonblocking:1;
> +    proxy_tunnel_rec *tunnel;
> +    apr_array_header_t *pfds;
> +    apr_interval_time_t idle_timeout;
> +
> +    unsigned int can_go_async           :1,
> +                 expecting_100          :1,
> +                 do_100_continue        :1,
> +                 prefetch_nonblocking   :1,
> +                 force10                :1;
>  } proxy_http_req_t;
>  
> +static void proxy_http_async_finish(proxy_http_req_t *req)
> +{ 
> +    conn_rec *c = req->r->connection;
> +
> +    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                  "proxy %s: finish async", req->proto);
> +
> +    proxy_run_detach_backend(req->r, req->backend);
> +    ap_proxy_release_connection(req->proto, req->backend, req->r->server);
> +
> +    ap_finalize_request_protocol(req->r);
> +    ap_process_request_after_handler(req->r);
> +    /* don't touch req or req->r from here */
> +
> +    c->cs->state = CONN_STATE_LINGER;
> +    ap_mpm_resume_suspended(c);
> +}
> +
> +/* If neither socket becomes readable in the specified timeout,
> + * this callback will kill the request.
> + * We do not have to worry about having a cancel and a IO both queued.
> + */
> +static void proxy_http_async_cancel_cb(void *baton)
> +{ 
> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +
> +    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                  "proxy %s: cancel async", req->proto);
> +
> +    req->r->connection->keepalive = AP_CONN_CLOSE;
> +    req->backend->close = 1;
> +    proxy_http_async_finish(req);
> +}
> +
> +/* Invoked by the event loop when data is ready on either end. 
> + * We don't need the invoke_mtx, since we never put multiple callback events
> + * in the queue.
> + */
> +static void proxy_http_async_cb(void *baton)
> +{ 
> +    proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +    int status;
> +
> +    if (req->pfds) {
> +        apr_pool_clear(req->pfds->pool);
> +    }
> +
> +    switch (req->state) {
> +    case PROXY_HTTP_TUNNELING:
> +        /* Pump both ends until they'd block and then start over again */
> +        status = ap_proxy_tunnel_run(req->tunnel);
> +        if (status == HTTP_GATEWAY_TIME_OUT) {
> +            if (req->pfds) {
> +                apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> +                apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> +                async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> +                async_pfds[1].reqevents = tunnel_pfds[1].reqevents;

What is the purpose of this?
async_pfds and tunnel_pfds  are local to this block and cannot be used outside this block.

> +            }
> +            else {
> +                req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> +                apr_pool_create(&req->pfds->pool, req->p);

Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?

> +            }
> +            status = SUSPENDED;
> +        }
> +        break;
> +
> +    default:
> +        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
> +                      "proxy %s: unexpected async state (%i)",
> +                      req->proto, (int)req->state);
> +        status = HTTP_INTERNAL_SERVER_ERROR;
> +        break;
> +    }
> +
> +    if (status == SUSPENDED) {
> +        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> +                      "proxy %s: suspended, going async",
> +                      req->proto);
> +
> +        ap_mpm_register_poll_callback_timeout(req->pfds,
> +                                              proxy_http_async_cb, 
> +                                              proxy_http_async_cancel_cb, 
> +                                              req, req->idle_timeout);
> +    }
> +    else if (status != OK) {
> +        proxy_http_async_cancel_cb(req);
> +    }
> +    else {
> +        proxy_http_async_finish(req);
> +    }
> +}
> +
>  /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
>   * pass a FLUSH bucket to the backend and read again in blocking mode.
>   */

Regards

Rüdiger