You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/02/15 18:10:55 UTC

svn commit: r1730572 - /httpd/httpd/trunk/modules/http2/

Author: icing
Date: Mon Feb 15 17:10:54 2016
New Revision: 1730572

URL: http://svn.apache.org/viewvc?rev=1730572&view=rev
Log:
first working h2 request engine implementation that does serial processing of proxy requests

Modified:
    httpd/httpd/trunk/modules/http2/h2_from_h1.c
    httpd/httpd/trunk/modules/http2/h2_from_h1.h
    httpd/httpd/trunk/modules/http2/h2_io.h
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_proxy_session.c
    httpd/httpd/trunk/modules/http2/h2_proxy_session.h
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task.h
    httpd/httpd/trunk/modules/http2/h2_task_input.c
    httpd/httpd/trunk/modules/http2/h2_task_input.h
    httpd/httpd/trunk/modules/http2/h2_task_output.c
    httpd/httpd/trunk/modules/http2/h2_task_output.h
    httpd/httpd/trunk/modules/http2/h2_worker.c
    httpd/httpd/trunk/modules/http2/mod_http2.c
    httpd/httpd/trunk/modules/http2/mod_http2.h
    httpd/httpd/trunk/modules/http2/mod_proxy_http2.c

Modified: httpd/httpd/trunk/modules/http2/h2_from_h1.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_from_h1.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_from_h1.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_from_h1.c Mon Feb 15 17:10:54 2016
@@ -49,12 +49,6 @@ h2_from_h1 *h2_from_h1_create(int stream
     return from_h1;
 }
 
-apr_status_t h2_from_h1_destroy(h2_from_h1 *from_h1)
-{
-    from_h1->bb = NULL;
-    return APR_SUCCESS;
-}
-
 static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
 {
     if (from_h1->state != state) {

Modified: httpd/httpd/trunk/modules/http2/h2_from_h1.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_from_h1.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_from_h1.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_from_h1.h Mon Feb 15 17:10:54 2016
@@ -60,8 +60,6 @@ struct h2_from_h1 {
 
 h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
 
-apr_status_t h2_from_h1_destroy(h2_from_h1 *response);
-
 apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
                                       ap_filter_t* f, apr_bucket_brigade* bb);
 

Modified: httpd/httpd/trunk/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.h Mon Feb 15 17:10:54 2016
@@ -20,6 +20,7 @@ struct h2_response;
 struct apr_thread_cond_t;
 struct h2_mplx;
 struct h2_request;
+struct h2_task;
 
 
 typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
@@ -62,6 +63,9 @@ struct h2_io {
     apr_size_t input_consumed;       /* how many bytes have been read */
         
     int files_handles_owned;
+    
+    struct h2_task *task;            /* parked task */
+    request_rec *r;                  /* parked request */
 };
 
 /*******************************************************************************

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Mon Feb 15 17:10:54 2016
@@ -17,7 +17,7 @@
 #include <stddef.h>
 #include <stdlib.h>
 
-#include <apr_atomic.h>
+#include <apr_queue.h>
 #include <apr_thread_mutex.h>
 #include <apr_thread_cond.h>
 #include <apr_strings.h>
@@ -27,9 +27,12 @@
 #include <http_core.h>
 #include <http_log.h>
 
+#include "mod_http2.h"
+
 #include "h2_private.h"
 #include "h2_config.h"
 #include "h2_conn.h"
+#include "h2_ctx.h"
 #include "h2_h2.h"
 #include "h2_io.h"
 #include "h2_io_set.h"
@@ -390,6 +393,9 @@ apr_status_t h2_mplx_stream_done(h2_mplx
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
         if (io) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, 
+                          "h2_mplx(%ld-%d): marking stream as done.", 
+                          m->id, stream_id);
             io_stream_done(m, io, rst_error);
         }
 
@@ -415,25 +421,56 @@ static const h2_request *pop_request(h2_
     return req;
 }
 
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r)
+{
+    if (!m->engine_queue) {
+        apr_queue_create(&m->engine_queue, 200, m->pool);
+    }
+    return apr_queue_trypush(m->engine_queue, r);
+}
+
 void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
 {
     h2_mplx *m = *pm;
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld): request(%d) done", m->id, stream_id);
-        if (io) {
-            io->processing_done = 1;
-            if (io->orphaned) {
-                io_destroy(m, io, 0);
-                if (m->join_wait) {
-                    apr_thread_cond_signal(m->join_wait);
+        if (stream_id) {
+            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): request(%d) done", m->id, stream_id);
+            if (io) {
+                request_rec *r = io->r;
+                
+                if (io->orphaned) {
+                    io->processing_done = 1;
+                }
+                else if (r) {
+                    /* A parked request which is being transferred from
+                     * one worker thread to another. This request_done call
+                     * was from the initial thread and now it is safe to
+                     * schedule it for further processing. */
+                    h2_task_thaw(io->task);
+                    io->task = NULL;
+                    io->r = NULL;
+                    h2_mplx_engine_schedule(*pm, r);
+                }
+                else {
+                    io->processing_done = 1;
+                }
+                
+                if (io->processing_done) {
+                    h2_io_out_close(io, NULL);
+                    if (io->orphaned) {
+                        io_destroy(m, io, 0);
+                        if (m->join_wait) {
+                            apr_thread_cond_signal(m->join_wait);
+                        }
+                    }
+                    else {
+                        /* hang around until the stream deregisteres */
+                    }
                 }
-            }
-            else {
-                /* hang around until the stream deregisteres */
             }
         }
         
@@ -800,7 +837,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             status = out_write(m, io, f, bb, trailers, iowait);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                           "h2_mplx(%ld-%d): write with trailers=%s", 
                           m->id, io->id, trailers? "yes" : "no");
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
@@ -1049,3 +1086,107 @@ const h2_request *h2_mplx_pop_request(h2
     return req;
 }
 
+apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
+                                 const char *engine_type, 
+                                 request_rec *r, h2_mplx_engine_init *einit)
+{
+    apr_status_t status;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+        if (!io || io->orphaned) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_req_engine *engine;
+            
+            apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
+            status = APR_EOF;
+            engine = m->engine; /* just a single one for now */
+            if (task->ser_headers) {
+                /* Max compatibility, deny processing of this */
+            }
+            else if (!engine && einit) {
+                engine = apr_pcalloc(r->connection->pool, sizeof(*engine));
+                engine->id = 1;
+                engine->c = r->connection;
+                engine->pool = r->connection->pool;
+                engine->type = apr_pstrdup(engine->pool, engine_type);
+                
+                status = einit(engine, r);
+                if (status == APR_SUCCESS) {
+                    m->engine = engine;
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                                  "h2_mplx(%ld): init engine %d (%s)", 
+                                  m->c->id, engine->id, engine->type);
+                }
+            }
+            else if (engine && !strcmp(engine->type, engine_type)) {
+                if (status == APR_SUCCESS) {
+                    /* this task will be processed in another thread,
+                     * freeze any I/O for the time being. */
+                    h2_task_freeze(task, r);
+                    io->task = task;
+                    io->r = r;
+                }
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r,
+                              "h2_mplx(%ld): push request %s", 
+                              m->c->id, r->the_request);
+            }
+        }
+        
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+                                 
+apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task,
+                                 struct h2_req_engine *engine, 
+                                 apr_time_t timeout, request_rec **pr)
+{   
+    apr_status_t status;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        status = APR_ECONNABORTED;
+        if (m->engine == engine && m->engine_queue) {
+            void *elem;
+            status = apr_queue_trypop(m->engine_queue, &elem);
+            if (status == APR_SUCCESS) {
+                *pr = elem;
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr,
+                              "h2_mplx(%ld): request %s pulled by engine %d", 
+                              m->c->id, (*pr)->the_request, engine->id);
+            }
+        }
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+ 
+void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn)
+{
+    int stream_id = task->stream_id;
+    h2_task_output_close(task->output);
+    h2_mplx_request_done(&m, stream_id, NULL);
+    apr_pool_destroy(r_conn->pool);
+}
+                                
+void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
+                         struct h2_req_engine *engine)
+{
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+        /* TODO: shutdown of engine->c */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                      "h2_mplx(%ld): exit engine %d (%s)", 
+                      m->c->id, engine->id, engine->type);
+        m->engine = NULL;
+        leave_mutex(m, acquired);
+    }
+}

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Mon Feb 15 17:10:54 2016
@@ -47,7 +47,9 @@ struct apr_thread_cond_t;
 struct h2_workers;
 struct h2_stream_set;
 struct h2_task_queue;
+struct h2_req_engine;
 
+#include <apr_queue.h>
 #include "h2_io.h"
 
 typedef struct h2_mplx h2_mplx;
@@ -87,6 +89,9 @@ struct h2_mplx {
     
     h2_mplx_consumed_cb *input_consumed;
     void *input_consumed_ctx;
+    
+    struct h2_req_engine *engine;
+    apr_queue_t *engine_queue;
 };
 
 
@@ -373,4 +378,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx
 #define H2_MPLX_REMOVE(e)	APR_RING_REMOVE((e), link)
 
 
+/*******************************************************************************
+ * h2_mplx h2_req_engine handling.
+ ******************************************************************************/
+ 
+typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, 
+                                         request_rec *r);
+
+apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task, 
+                                 const char *engine_type, 
+                                 request_rec *r, h2_mplx_engine_init *einit);
+                                 
+apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task, 
+                                 struct h2_req_engine *engine, 
+                                 apr_time_t timeout, request_rec **pr);
+
+void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn);
+                                 
+void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
+                         struct h2_req_engine *engine);
+
 #endif /* defined(__mod_h2__h2_mplx__) */

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Mon Feb 15 17:10:54 2016
@@ -18,6 +18,7 @@
 
 #include <httpd.h>
 #include <mod_proxy.h>
+#include <mod_http2.h>
 
 #include "h2.h"
 #include "h2_request.h"
@@ -60,6 +61,45 @@ static apr_status_t proxy_session_shutdo
     return APR_SUCCESS;
 }
 
+static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
+                              proxy_conn_rec *p_conn,
+                              conn_rec *origin, apr_bucket_brigade *bb,
+                              int flush)
+{
+    apr_status_t status;
+    apr_off_t transferred;
+
+    if (flush) {
+        apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(bb, e);
+    }
+    apr_brigade_length(bb, 0, &transferred);
+    if (transferred != -1)
+        p_conn->worker->s->transferred += transferred;
+    status = ap_pass_brigade(origin->output_filters, bb);
+    /* Cleanup the brigade now to avoid buckets lifetime
+     * issues in case of error returned below. */
+    apr_brigade_cleanup(bb);
+    if (status != APR_SUCCESS) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084)
+                      "pass request body failed to %pI (%s)",
+                      p_conn->addr, p_conn->hostname);
+        if (origin->aborted) {
+            const char *ssl_note;
+
+            if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv"))
+                 != NULL) && (strcmp(ssl_note, "err") == 0)) {
+                return HTTP_INTERNAL_SERVER_ERROR;
+            }
+            return HTTP_GATEWAY_TIME_OUT;
+        }
+        else {
+            return HTTP_BAD_REQUEST;
+        }
+    }
+    return OK;
+}
+
 static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
                         size_t length, int flags, void *user_data)
 {
@@ -75,9 +115,9 @@ static ssize_t raw_send(nghttp2_session
                                     session->c->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(session->output, b);
 
-    status = ap_proxy_pass_brigade(session->c->bucket_alloc, session->r, 
-                                   session->p_conn, session->c, 
-                                   session->output, flush);
+    status = proxy_pass_brigade(session->c->bucket_alloc,  
+                                session->p_conn, session->c, 
+                                session->output, flush);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
                       "h2_proxy_sesssion(%ld): sending", session->c->id);
@@ -203,6 +243,16 @@ static apr_status_t h2_proxy_stream_add_
     return APR_SUCCESS;
 }
 
+static int log_header(void *ctx, const char *key, const char *value)
+{
+    h2_proxy_stream *stream = ctx;
+    
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
+                  "h2_proxy_stream(%ld-%d), header_out %s: %s", 
+                  stream->session->c->id, stream->id, key, value);
+    return 1;
+}
+
 static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) 
 {
     h2_proxy_session *session = stream->session;
@@ -254,6 +304,13 @@ static void h2_proxy_stream_end_headers_
                                       server_name, portstr)
                        );
     }
+    
+    if (APLOGrtrace2(stream->r)) {
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
+                      "h2_proxy_stream(%ld-%d), header_out after merging", 
+                      stream->session->c->id, stream->id);
+        apr_table_do(log_header, stream, stream->r->headers_out, NULL);
+    }
 }
 
 static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
@@ -278,7 +335,8 @@ static int on_data_chunk_recv(nghttp2_se
         stream->data_received = 1;
     }
     
-    b = apr_bucket_transient_create((const char*)data, len, session->c->bucket_alloc);
+    b = apr_bucket_transient_create((const char*)data, len, 
+                                    stream->r->connection->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(stream->output, b);
     status = ap_pass_brigade(stream->r->output_filters, stream->output);
     if (status != APR_SUCCESS) {
@@ -344,7 +402,6 @@ static ssize_t stream_data_read(nghttp2_
                                 uint32_t *data_flags, 
                                 nghttp2_data_source *source, void *user_data)
 {
-    h2_proxy_session *session = user_data;
     h2_proxy_stream *stream;
     apr_status_t status = APR_SUCCESS;
     
@@ -358,9 +415,9 @@ static ssize_t stream_data_read(nghttp2_
         status = ap_get_brigade(stream->r->input_filters, stream->input,
                                 AP_MODE_READBYTES, APR_BLOCK_READ,
                                 H2MIN(APR_BUCKET_BUFF_SIZE, length));
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                      "h2_proxy_stream(%ld-%d): request body read", 
-                      session->c->id, stream->id);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
+                      "h2_proxy_stream(%d): request body read", 
+                      stream->id);
     }
 
     if (status == APR_SUCCESS) {
@@ -396,9 +453,9 @@ static ssize_t stream_data_read(nghttp2_
             apr_bucket_delete(b);
         }
 
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                      "h2_proxy_stream(%ld-%d): request body read %ld bytes, flags=%d", 
-                      session->c->id, stream->id, (long)readlen, (int)*data_flags);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
+                      "h2_proxy_stream(%d): request body read %ld bytes, flags=%d", 
+                      stream->id, (long)readlen, (int)*data_flags);
         return readlen;
     }
     else if (APR_STATUS_IS_EAGAIN(status)) {
@@ -424,7 +481,6 @@ h2_proxy_session *h2_proxy_session_setup
         session->c = p_conn->connection;
         session->p_conn = p_conn;
         session->conf = conf;
-        session->r = r;
         session->pool = p_conn->scpool;
         session->window_bits_default    = 30;
         session->window_bits_connection = 30;
@@ -471,7 +527,7 @@ apr_status_t h2_proxy_session_open_strea
     h2_proxy_stream *stream;
     apr_uri_t puri;
     const char *authority, *scheme, *path;
-    
+
     stream = apr_pcalloc(r->pool, sizeof(*stream));
 
     stream->pool = r->pool;
@@ -625,9 +681,14 @@ apr_status_t h2_proxy_stream_process(h2_
     rv = nghttp2_submit_request(session->ngh2, NULL, 
                                 hd->nv, hd->nvlen, pp, stream);
                                 
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
-                  "h2_session(%ld): submit request -> %d", 
-                  session->c->id, rv);
+    if (APLOGcdebug(session->c)) {
+        const char *task_id = apr_table_get(stream->r->connection->notes, 
+                                            H2_TASK_ID_NOTE);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      "h2_session(%ld): submit %s%s -> %d (task %s)", 
+                      session->c->id, stream->req->authority, stream->req->path,
+                      rv, task_id);
+    }
     if (rv > 0) {
         stream->id = rv;
         stream->state = H2_STREAM_ST_OPEN;

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Mon Feb 15 17:10:54 2016
@@ -24,7 +24,6 @@ typedef struct h2_proxy_session {
     conn_rec *c;
     proxy_conn_rec *p_conn;
     proxy_server_conf *conf;
-    request_rec *r;
     apr_pool_t *pool;
     nghttp2_session *ngh2;   /* the nghttp2 session itself */
     
@@ -66,4 +65,6 @@ apr_status_t h2_proxy_session_open_strea
                                           request_rec *r, h2_proxy_stream **pstream);
 apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream);
 
+#define H2_PROXY_REQ_URL_NOTE   "h2-proxy-req-url"
+
 #endif /* h2_proxy_session_h */

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Mon Feb 15 17:10:54 2016
@@ -85,6 +85,28 @@ static apr_status_t h2_filter_read_respo
     return h2_from_h1_read_response(task->output->from_h1, f, bb);
 }
 
+static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
+                                              apr_bucket_brigade* bb)
+{
+    h2_task *task = f->ctx;
+    AP_DEBUG_ASSERT(task);
+    
+    if (task->frozen) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+                      "h2_response_freeze_filter, saving");
+        APR_BRIGADE_CONCAT(task->frozen_out, bb);
+        return APR_SUCCESS;
+    }
+    
+    if (APR_BRIGADE_EMPTY(bb)) {
+        return APR_SUCCESS;
+    }
+
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+                  "h2_response_freeze_filter, passing");
+    return ap_pass_brigade(f->next, bb);
+}
+
 /*******************************************************************************
  * Register various hooks
  */
@@ -119,6 +141,8 @@ void h2_task_register_hooks(void)
                               NULL, AP_FTYPE_PROTOCOL);
     ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter,
                               NULL, AP_FTYPE_PROTOCOL);
+    ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter,
+                              NULL, AP_FTYPE_RESOURCE);
 }
 
 /* post config init */
@@ -168,6 +192,7 @@ h2_task *h2_task_create(long session_id,
     
     task->id          = apr_psprintf(pool, "%ld-%d", session_id, req->id);
     task->stream_id   = req->id;
+    task->pool        = pool;
     task->mplx        = mplx;
     task->request     = req;
     task->input_eos   = !req->body;
@@ -179,6 +204,8 @@ h2_task *h2_task_create(long session_id,
 apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond, 
                         apr_socket_t *socket)
 {
+    apr_status_t status;
+    
     AP_DEBUG_ASSERT(task);
     task->io = cond;
     task->input = h2_task_input_create(task, c);
@@ -186,21 +213,27 @@ apr_status_t h2_task_do(h2_task *task, c
     
     ap_process_connection(c, socket);
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                  "h2_task(%s): processing done", task->id);
-    
-    h2_task_input_destroy(task->input);
-    h2_task_output_close(task->output);
-    h2_task_output_destroy(task->output);
-    task->io = NULL;
+    if (task->frozen) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_task(%s): process_conn returned frozen task", 
+                      task->id);
+        /* cleanup delayed */
+        status = APR_EAGAIN;
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                      "h2_task(%s): processing done", task->id);
+        status = APR_SUCCESS;
+    }
     
-    return APR_SUCCESS;
+    return status;
 }
 
-static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c)
+static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
 {
-    request_rec *r;
+    const h2_request *req = task->request;
     conn_state_t *cs = c->cs;
+    request_rec *r;
 
     r = h2_request_create_rec(req, c);
     if (r && (r->status == HTTP_OK)) {
@@ -210,10 +243,15 @@ static apr_status_t h2_task_process_requ
             cs->state = CONN_STATE_HANDLER;
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_task(%ld-%d): start process_request", c->id, req->id);
+                      "h2_task(%s): start process_request", task->id);
         ap_process_request(r);
+        if (task->frozen) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                          "h2_task(%s): process_request frozen", task->id);
+        }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
-                      "h2_task(%ld-%d): process_request done", c->id, req->id);
+                      "h2_task(%s): process_request done", task->id);
+        
         /* After the call to ap_process_request, the
          * request pool will have been deleted.  We set
          * r=NULL here to ensure that any dereference
@@ -221,11 +259,10 @@ static apr_status_t h2_task_process_requ
          * will result in a segfault immediately instead
          * of nondeterministic failures later.
          */
-        if (cs)
+        if (cs) 
             cs->state = CONN_STATE_WRITE_COMPLETION;
         r = NULL;
     }
-    ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
     c->sbh = NULL;
 
     return APR_SUCCESS;
@@ -244,7 +281,7 @@ static int h2_task_process_conn(conn_rec
         if (!ctx->task->ser_headers) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
                           "h2_h2, processing request directly");
-            h2_task_process_request(ctx->task->request, c);
+            h2_task_process_request(ctx->task, c);
             return DONE;
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
@@ -252,3 +289,24 @@ static int h2_task_process_conn(conn_rec
     }
     return DECLINED;
 }
+
+apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+{   
+    if (!task->frozen) {
+        conn_rec *c = task->output->c;
+        
+        task->frozen = 1;
+        task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
+        ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_task_thaw(h2_task *task)
+{
+    if (task->frozen) {
+        task->frozen = 0;
+    }
+    return APR_SUCCESS;
+}
+

Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Mon Feb 15 17:10:54 2016
@@ -50,16 +50,20 @@ typedef struct h2_task h2_task;
 struct h2_task {
     const char *id;
     int stream_id;
+    apr_pool_t *pool;
     struct h2_mplx *mplx;    
     const struct h2_request *request;
     
     unsigned int filters_set : 1;
     unsigned int input_eos   : 1;
     unsigned int ser_headers : 1;
+    unsigned int frozen      : 1;
     
     struct h2_task_input *input;
     struct h2_task_output *output;
     struct apr_thread_cond_t *io;   /* used to wait for events on */
+
+    apr_bucket_brigade *frozen_out;
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
@@ -77,4 +81,7 @@ apr_status_t h2_task_init(apr_pool_t *po
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
 
+apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_thaw(h2_task *task);
+
 #endif /* defined(__mod_h2__h2_task__) */

Modified: httpd/httpd/trunk/modules/http2/h2_task_input.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_input.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_input.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_input.c Mon Feb 15 17:10:54 2016
@@ -75,11 +75,6 @@ h2_task_input *h2_task_input_create(h2_t
     return input;
 }
 
-void h2_task_input_destroy(h2_task_input *input)
-{
-    input->bb = NULL;
-}
-
 apr_status_t h2_task_input_read(h2_task_input *input,
                                 ap_filter_t* f,
                                 apr_bucket_brigade* bb,

Modified: httpd/httpd/trunk/modules/http2/h2_task_input.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_input.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_input.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_input.h Mon Feb 15 17:10:54 2016
@@ -34,8 +34,6 @@ struct h2_task_input {
 
 h2_task_input *h2_task_input_create(struct h2_task *task, conn_rec *c);
 
-void h2_task_input_destroy(h2_task_input *input);
-
 apr_status_t h2_task_input_read(h2_task_input *input,
                                   ap_filter_t* filter,
                                   apr_bucket_brigade* brigade,

Modified: httpd/httpd/trunk/modules/http2/h2_task_output.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.c Mon Feb 15 17:10:54 2016
@@ -49,14 +49,6 @@ h2_task_output *h2_task_output_create(h2
     return output;
 }
 
-void h2_task_output_destroy(h2_task_output *output)
-{
-    if (output->from_h1) {
-        h2_from_h1_destroy(output->from_h1);
-        output->from_h1 = NULL;
-    }
-}
-
 static apr_table_t *get_trailers(h2_task_output *output)
 {
     if (!output->trailers_passed) {
@@ -75,7 +67,7 @@ static apr_table_t *get_trailers(h2_task
 }
 
 static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
-                                   apr_bucket_brigade *bb)
+                                   apr_bucket_brigade *bb, const char *caller)
 {
     if (output->state == H2_TASK_OUT_INIT) {
         h2_response *response;
@@ -86,9 +78,10 @@ static apr_status_t open_if_needed(h2_ta
                 /* This happens currently when ap_die(status, r) is invoked
                  * by a read request filter. */
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
-                              "h2_task_output(%s): write without response "
+                              "h2_task_output(%s): write without response by %s "
                               "for %s %s %s",
-                              output->task->id, output->task->request->method, 
+                              output->task->id, caller, 
+                              output->task->request->method, 
                               output->task->request->authority, 
                               output->task->request->path);
                 f->c->aborted = 1;
@@ -108,6 +101,11 @@ static apr_status_t open_if_needed(h2_ta
             h2_task_logio_add_bytes_out(f->c, bytes_written);
         }
         get_trailers(output);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
+                      "h2_task_output(%s): open as needed %s %s %s",
+                      output->task->id, output->task->request->method, 
+                      output->task->request->authority, 
+                      output->task->request->path);
         return h2_mplx_out_open(output->task->mplx, output->task->stream_id, 
                                 response, f, bb, output->task->io);
     }
@@ -116,19 +114,19 @@ static apr_status_t open_if_needed(h2_ta
 
 void h2_task_output_close(h2_task_output *output)
 {
-    open_if_needed(output, NULL, NULL);
+    open_if_needed(output, NULL, NULL, "close");
     if (output->state != H2_TASK_OUT_DONE) {
+        if (output->task->frozen_out 
+            && !APR_BRIGADE_EMPTY(output->task->frozen_out)) {
+            h2_mplx_out_write(output->task->mplx, output->task->stream_id, 
+                NULL, output->task->frozen_out, NULL, NULL);
+        }
         h2_mplx_out_close(output->task->mplx, output->task->stream_id, 
                           get_trailers(output));
         output->state = H2_TASK_OUT_DONE;
     }
 }
 
-int h2_task_output_has_started(h2_task_output *output)
-{
-    return output->state >= H2_TASK_OUT_STARTED;
-}
-
 /* Bring the data from the brigade (which represents the result of the
  * request_rec out filter chain) into the h2_mplx for further sending
  * on the master connection. 
@@ -144,7 +142,14 @@ apr_status_t h2_task_output_write(h2_tas
         return APR_SUCCESS;
     }
     
-    status = open_if_needed(output, f, bb);
+    if (output->task->frozen) {
+        h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
+                       "frozen task output write", bb);
+        APR_BRIGADE_CONCAT(output->task->frozen_out, bb);
+        return APR_SUCCESS;
+    }
+    
+    status = open_if_needed(output, f, bb, "write");
     if (status != APR_EOF) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                       "h2_task_output(%s): opened and passed brigade", 

Modified: httpd/httpd/trunk/modules/http2/h2_task_output.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.h Mon Feb 15 17:10:54 2016
@@ -44,14 +44,13 @@ struct h2_task_output {
 
 h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
 
-void h2_task_output_destroy(h2_task_output *output);
-
 apr_status_t h2_task_output_write(h2_task_output *output,
                                   ap_filter_t* filter,
                                   apr_bucket_brigade* brigade);
 
 void h2_task_output_close(h2_task_output *output);
 
-int h2_task_output_has_started(h2_task_output *output);
+apr_status_t h2_task_output_freeze(h2_task_output *output);
+apr_status_t h2_task_output_thaw(h2_task_output *output);
 
 #endif /* defined(__mod_h2__h2_task_output__) */

Modified: httpd/httpd/trunk/modules/http2/h2_worker.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_worker.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_worker.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_worker.c Mon Feb 15 17:10:54 2016
@@ -35,24 +35,9 @@ static void* APR_THREAD_FUNC execute(apr
 {
     h2_worker *worker = (h2_worker *)wctx;
     apr_status_t status;
-    apr_allocator_t *task_allocator = NULL;
-    apr_pool_t *task_pool;
+    apr_pool_t *task_pool = NULL;
     
     (void)thread;
-    
-    /* We create a root pool with its own allocator to be used for
-     * processing a request. This is the only way to have the processing
-     * independant of the worker pool as the h2_mplx pool as well as
-     * not sensitive to which thread it is in.
-     * In that sense, memory allocation and lifetime is similar to a master
-     * connection.
-     * The main goal in this is that slave connections and requests will
-     * - one day - be suspended and resumed in different threads.
-     */
-    apr_allocator_create(&task_allocator);
-    apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator);
-    apr_allocator_owner_set(task_allocator, task_pool);
-
     /* Other code might want to see a socket for this connection this
      * worker processes. Allocate one without further function...
      */
@@ -78,6 +63,22 @@ static void* APR_THREAD_FUNC execute(apr
             conn_rec *c, *master = m->c;
             int stream_id = req->id;
             
+            if (!task_pool) {
+                apr_allocator_t *task_allocator = NULL;
+                /* We create a root pool with its own allocator to be used for
+                 * processing a request. This is the only way to have the processing
+                 * independant of the worker pool as the h2_mplx pool as well as
+                 * not sensitive to which thread it is in.
+                 * In that sense, memory allocation and lifetime is similar to a master
+                 * connection.
+                 * The main goal in this is that slave connections and requests will
+                 * - one day - be suspended and resumed in different threads.
+                 */
+                apr_allocator_create(&task_allocator);
+                apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator);
+                apr_allocator_owner_set(task_allocator, task_pool);
+            }
+            
             c = h2_slave_create(master, task_pool, 
                                 worker->thread, worker->socket);
             if (!c) {
@@ -92,8 +93,13 @@ static void* APR_THREAD_FUNC execute(apr
                 task = h2_task_create(m->id, req, task_pool, m);
                 h2_ctx_create_for(c, task);
                 h2_task_do(task, c, worker->io, worker->socket);
-                task = NULL;
                 
+                if (task->frozen) {
+                    /* this task was handed over to someone else for
+                     * processing */
+                    task_pool = NULL;
+                }
+                task = NULL;
                 apr_thread_cond_signal(worker->io);
             }
             
@@ -103,7 +109,9 @@ static void* APR_THREAD_FUNC execute(apr
              * long as it has requests to handle. Might no be fair to
              * other mplx's. Perhaps leave after n requests? */
             req = NULL;
-            apr_pool_clear(task_pool);
+            if (task_pool) {
+                apr_pool_clear(task_pool);
+            }
             h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req);
         }
     }

Modified: httpd/httpd/trunk/modules/http2/mod_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.c Mon Feb 15 17:10:54 2016
@@ -35,6 +35,7 @@
 #include "h2_config.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
+#include "h2_mplx.h"
 #include "h2_push.h"
 #include "h2_request.h"
 #include "h2_switch.h"
@@ -121,6 +122,57 @@ static char *http2_var_lookup(apr_pool_t
                          conn_rec *, request_rec *, char *name);
 static int http2_is_h2(conn_rec *);
 
+static apr_status_t http2_req_engine_push(const char *engine_type, 
+                                          request_rec *r, 
+                                          h2_req_engine_init *einit)
+{
+    h2_ctx *ctx = h2_ctx_rget(r);
+    if (ctx) {
+        h2_task *task = h2_ctx_get_task(ctx);
+        if (task) {
+            return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit);
+        }
+    }
+    return APR_EINVAL;
+}
+
+static apr_status_t http2_req_engine_pull(h2_req_engine *engine, 
+                                          apr_time_t timeout, request_rec **pr)
+{
+    h2_ctx *ctx = h2_ctx_get(engine->c, 0);
+    if (ctx) {
+        h2_task *task = h2_ctx_get_task(ctx);
+        if (task) {
+            return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr);
+        }
+    }
+    return APR_ECONNABORTED;
+}
+
+static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
+{
+    h2_ctx *ctx = h2_ctx_get(r_conn, 0);
+    if (ctx) {
+        h2_task *task = h2_ctx_get_task(ctx);
+        if (task) {
+            h2_mplx_engine_done(task->mplx, task, r_conn);
+            /* task is destroyed */
+        }
+    }
+}
+
+static void http2_req_engine_exit(h2_req_engine *engine)
+{
+    h2_ctx *ctx = h2_ctx_get(engine->c, 0);
+    if (ctx) {
+        h2_task *task = h2_ctx_get_task(ctx);
+        if (task) {
+            h2_mplx_engine_exit(task->mplx, task, engine);
+        }
+    }
+}
+
+
 /* Runs once per created child process. Perform any process 
  * related initionalization here.
  */
@@ -143,6 +195,10 @@ static void h2_hooks(apr_pool_t *pool)
     
     APR_REGISTER_OPTIONAL_FN(http2_is_h2);
     APR_REGISTER_OPTIONAL_FN(http2_var_lookup);
+    APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
+    APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
+    APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
+    APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit);
 
     ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");
     

Modified: httpd/httpd/trunk/modules/http2/mod_http2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.h (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.h Mon Feb 15 17:10:54 2016
@@ -18,13 +18,97 @@
 
 /** The http2_var_lookup() optional function retrieves HTTP2 environment
  * variables. */
-APR_DECLARE_OPTIONAL_FN(char *, http2_var_lookup,
-                        (apr_pool_t *, server_rec *,
-                         conn_rec *, request_rec *,
-                         char *));
+APR_DECLARE_OPTIONAL_FN(char *, 
+                        http2_var_lookup, (apr_pool_t *, server_rec *,
+                                           conn_rec *, request_rec *,  char *));
 
 /** An optional function which returns non-zero if the given connection
  * or its master connection is using HTTP/2. */
-APR_DECLARE_OPTIONAL_FN(int, http2_is_h2, (conn_rec *));
+APR_DECLARE_OPTIONAL_FN(int, 
+                        http2_is_h2, (conn_rec *));
+
+
+/*******************************************************************************
+ * HTTP/2 slave engines
+ ******************************************************************************/
+ 
+typedef struct h2_req_engine h2_req_engine;
+
+/**
+ * Initialize a h2_req_engine. The structure will be passed in but
+ * only the name and master are set. The function should initialize
+ * all fields.
+ * @param engine the allocated, partially filled structure
+ * @param r      the first request to process, or NULL
+ */
+typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
+
+/**
+ * The public structure of a h2_req_engine. It gets allocated by the http2
+ * infrastructure, assigned id, type, pool and connection and passed to the
+ * h2_req_engine_init() callback to complete initialization.
+ * This happens whenever a new request gets "push"ed for an engine type and
+ * no instance, or no free instance, for the type is available.
+ */
+struct h2_req_engine {
+    int id;                /* identifier, unique for a master connection */
+    const char *type;      /* name of the engine type */
+    apr_pool_t *pool;      /* pool for engine specific allocations */
+    conn_rec *c;           /* connection this engine is assigned to */
+    apr_size_t r_capacity; /* request capacity engine is willing to handle,
+                              may change between invocations. If the engine
+                              sets this to 0, it signals that it no longer
+                              wants more requests. New requests, already 
+                              scheduled for this engine might still arrive for
+                              a time. */
+    apr_size_t r_count;    /* number of request currently assigned, it is the
+                              responsibility of the engine to update this. */
+    void *data;            /* engine specific data */
+};
+
+/**
+ * Push a request to an engine with the specified name for further processing.
+ * If no such engine is available, einit is not NULL, einit is called 
+ * with a new engine record and the caller is responsible for running the
+ * new engine instance.
+ * @param engine_type the type of the engine to add the request to
+ * @param r           the request to push to an engine for processing
+ * @param einit       an optional initialization callback for a new engine 
+ *                    of the requested type, should no instance be available.
+ *                    By passing a non-NULL callback, the caller is willing
+ *                    to init and run a new engine itself.
+ * @return APR_SUCCESS iff slave was successfully added to an engine
+ */
+APR_DECLARE_OPTIONAL_FN(apr_status_t, 
+                        http2_req_engine_push, (const char *engine_type, 
+                                                request_rec *r,
+                                                h2_req_engine_init *einit));
+
+/**
+ * Get a new request for processing in this engine.
+ * @param engine      the engine which is done processing the slave
+ * @param timeout     wait a maximum amount of time for a new slave, 0 will not wait
+ * @param pslave      the slave connection that needs processing or NULL
+ * @return APR_SUCCESS if new request was assigned
+ *         APR_EAGAIN/APR_TIMEUP if no new request is available
+ *         APR_ECONNABORTED if the engine needs to shut down
+ */
+APR_DECLARE_OPTIONAL_FN(apr_status_t, 
+                        http2_req_engine_pull, (h2_req_engine *engine, 
+                                                apr_time_t timeout,
+                                                request_rec **pr));
+APR_DECLARE_OPTIONAL_FN(void, 
+                        http2_req_engine_done, (h2_req_engine *engine, 
+                                                conn_rec *rconn));
+/**
+ * The given request engine is done processing and needs to be excluded
+ * from further handling. 
+ * @param engine      the engine to exit
+ */
+APR_DECLARE_OPTIONAL_FN(void,
+                        http2_req_engine_exit, (h2_req_engine *engine));
+
+
+#define H2_TASK_ID_NOTE     "http2-task-id"
 
 #endif

Modified: httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_proxy_http2.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_proxy_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_proxy_http2.c Mon Feb 15 17:10:54 2016
@@ -17,6 +17,7 @@
 
 #include <httpd.h>
 #include <mod_proxy.h>
+#include <mod_http2.h>
 
 
 #include "mod_proxy_http2.h"
@@ -37,6 +38,30 @@ AP_DECLARE_MODULE(proxy_http2) = {
     register_hook      /* register hooks */
 };
 
+/* Optional functions from mod_http2 */
+static int (*is_h2)(conn_rec *c);
+static apr_status_t (*req_engine_push)(const char *name, request_rec *r, 
+                                       h2_req_engine_init *einit);
+static apr_status_t (*req_engine_pull)(h2_req_engine *engine, 
+                                       apr_time_t timeout, request_rec **pr);
+static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
+static void (*req_engine_exit)(h2_req_engine *engine);
+                                       
+typedef struct h2_proxy_ctx {
+    conn_rec *owner;
+    server_rec *server;
+    const char *proxy_func;
+    char server_portstr[32];
+    proxy_conn_rec *p_conn;
+    proxy_worker *worker;
+    proxy_server_conf *conf;
+    
+    h2_req_engine *engine;
+    unsigned standalone : 1;
+    unsigned is_ssl : 1;
+    unsigned flushall : 1;
+} h2_proxy_ctx;
+
 static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
                                 apr_pool_t *ptemp, server_rec *s)
 {
@@ -58,6 +83,21 @@ static int h2_proxy_post_config(apr_pool
                  "mod_proxy_http2 (v%s, nghttp2 %s), initializing...",
                  MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown");
     
+    is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2);
+    req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
+    req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
+    req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
+    req_engine_exit = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_exit);
+    
+    /* we need all of them */
+    if (!req_engine_push || !req_engine_pull 
+        || !req_engine_done || !req_engine_exit) {
+        req_engine_push = NULL;
+        req_engine_pull = NULL;
+        req_engine_done = NULL;
+        req_engine_exit = NULL;
+    }
+    
     return status;
 }
 
@@ -147,59 +187,111 @@ static int proxy_http2_canon(request_rec
     return OK;
 }
 
-static apr_status_t proxy_http2_cleanup(const char *scheme, request_rec *r,
-                                        proxy_conn_rec *backend)
+static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
 {
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "cleanup, releasing connection");
-    ap_proxy_release_connection(scheme, backend, r->server);
-    return OK;
+    h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config, 
+                                             &proxy_http2_module);
+    if (ctx) {
+        ctx->engine = engine;
+        return APR_SUCCESS;
+    }
+    return APR_ENOTIMPL;
 }
 
-static
-int proxy_http2_process_stream(apr_pool_t *p, const char *url, request_rec *r,
-                               proxy_conn_rec **pp_conn, proxy_worker *worker,
-                               proxy_server_conf *conf, char *server_portstr,
-                               int flushall)
-{
-    int rv = APR_ENOTIMPL;
-    proxy_conn_rec *p_conn = *pp_conn;
+static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
+    int status = OK;
     h2_proxy_session *session;
     h2_proxy_stream *stream;
     
-    session = h2_proxy_session_setup(r, *pp_conn, conf);
+    /* Step Two: Make the Connection (or check that an already existing
+     * socket is still usable). On success, we have a socket connected to
+     * backend->hostname. */
+    if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker, 
+                                 ctx->server)) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO()
+                      "H2: failed to make connection to backend: %s",
+                      ctx->p_conn->hostname);
+        return HTTP_SERVICE_UNAVAILABLE;
+    }
+    
+    /* Step Three: Create conn_rec for the socket we have open now. */
+    if (!ctx->p_conn->connection) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO()
+                      "setup new connection: is_ssl=%d %s %s %s", 
+                      ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, 
+                      r->hostname, ctx->p_conn->hostname);
+        if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
+                                                 ctx->owner, 
+                                                 ctx->server)) != OK) {
+            return status;
+        }
+        
+        /*
+         * On SSL connections set a note on the connection what CN is
+         * requested, such that mod_ssl can check if it is requested to do
+         * so.
+         */
+        if (ctx->p_conn->ssl_hostname) {
+            apr_table_setn(ctx->p_conn->connection->notes,
+                           "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+        }
+        
+        if (ctx->is_ssl) {
+            apr_table_setn(ctx->p_conn->connection->notes,
+                           "proxy-request-alpn-protos", "h2");
+        }
+    }
+
+    /* Step Four: Send the Request in a new HTTP/2 stream and
+     * loop until we got the response or encounter errors.
+     */
+    status = APR_ENOTIMPL;
+    session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf);
     if (!session) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, p_conn->connection, 
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, 
                       "session unavailable");
         return HTTP_SERVICE_UNAVAILABLE;
     }
     
-    /* TODO
-     * - enter http2 client processing loop:
-     *   - send any input in datasource callback from r->input_filters
-     *   - await response HEADERs
-     *   - send any DATA to r->output_filters
-     * - on stream close, check for missing response
-     * - on certain errors, mark connection for close
-     */ 
-    rv = h2_proxy_session_open_stream(session, url, r, &stream);
-    if (rv == OK) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      "process stream(%d): %s %s%s, original: %s", 
-                      stream->id, stream->req->method, 
-                      stream->req->authority, stream->req->path, 
-                      r->the_request);
-        rv = h2_proxy_stream_process(stream);
-    }
-    
-    if (rv != OK) {
-        conn_rec *c = r->connection;
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO()
-                      "pass request body failed to %pI (%s) from %s (%s)",
-                      p_conn->addr, p_conn->hostname ? p_conn->hostname: "",
-                      c->client_ip, c->remote_host ? c->remote_host: "");
+    while (r) {
+        conn_rec *r_conn = r->connection;
+        const char *url;
+        
+        url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
+        status = h2_proxy_session_open_stream(session, url, r, &stream);
+        if (status == OK) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn, 
+                          "process stream(%d): %s %s%s, original: %s", 
+                          stream->id, stream->req->method, 
+                          stream->req->authority, stream->req->path, 
+                          r->the_request);
+            status = h2_proxy_stream_process(stream);
+        }
+        r = NULL;
+        
+        if (status != OK) {
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO()
+                          "pass request body failed to %pI (%s) from %s (%s)",
+                          ctx->p_conn->addr, ctx->p_conn->hostname ? 
+                          ctx->p_conn->hostname: "", session->c->client_ip, 
+                          session->c->remote_host ? session->c->remote_host: "");
+        }
+        
+        if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) {
+            req_engine_done(ctx->engine, r_conn);
+        }
+        r_conn = NULL;
+        
+        if (!ctx->standalone && req_engine_pull) {
+            status = req_engine_pull(ctx->engine, ctx->server->timeout, &r);
+            if (status != APR_SUCCESS) {
+                status = APR_SUCCESS;
+                break;
+            }
+        }
     }
-
-    return rv;
+    
+    return status;
 }
 
 static int proxy_http2_handler(request_rec *r, 
@@ -209,18 +301,17 @@ static int proxy_http2_handler(request_r
                                const char *proxyname,
                                apr_port_t proxyport)
 {
-    const char *proxy_function;
-    proxy_conn_rec *backend;
+    const char *proxy_func;
     char *locurl = url, *u;
     apr_size_t slen;
     int is_ssl = 0;
-    int flushall = 0;
-    int status;
-    char server_portstr[32];
+    apr_status_t status;
     conn_rec *c = r->connection;
-    apr_pool_t *p = r->pool;
+    server_rec *s = r->server;
+    apr_pool_t *p = c->pool;
     apr_uri_t *uri = apr_palloc(p, sizeof(*uri));
-    conn_rec *backconn;
+    h2_proxy_ctx *ctx;
+    const char *engine_type, *hostname;
 
     /* find the scheme */
     if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
@@ -233,22 +324,30 @@ static int proxy_http2_handler(request_r
     slen = (u - url);
     switch(slen) {
         case 2:
-            proxy_function = "H2";
+            proxy_func = "H2";
             is_ssl = 1;
             break;
         case 3:
             if (url[2] != 'c' && url[2] != 'C') {
                 return DECLINED;
             }
-            proxy_function = "H2C";
+            proxy_func = "H2C";
             break;
         default:
             return DECLINED;
     }
 
-    if (apr_table_get(r->subprocess_env, "proxy-flushall")) {
-        flushall = 1;
-    }
+    ctx = apr_pcalloc(p, sizeof(*ctx));
+    ctx->owner      = c;
+    ctx->server     = s;
+    ctx->proxy_func = proxy_func;
+    ctx->is_ssl     = is_ssl;
+    ctx->worker     = worker;
+    ctx->conf       = conf;
+    ctx->flushall   = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
+    
+    ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
+    apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
 
     /* scheme says, this is for us. */
     ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "H2: serving URL %s", url);
@@ -256,89 +355,82 @@ static int proxy_http2_handler(request_r
     /* Get a proxy_conn_rec from the worker, might be a new one, might
      * be one still open from another request, or it might fail if the
      * worker is stopped or in error. */
-    if ((status = ap_proxy_acquire_connection(proxy_function, &backend,
-                                              worker, r->server)) != OK) {
+    if ((status = ap_proxy_acquire_connection(ctx->proxy_func, &ctx->p_conn,
+                                              ctx->worker, s)) != OK) {
         goto cleanup;
     }
 
-    backend->is_ssl = is_ssl;
-    if (is_ssl) {
+    ctx->p_conn->is_ssl = ctx->is_ssl;
+    if (ctx->is_ssl) {
         /* If there is still some data on an existing ssl connection, now
          * would be a good timne to get rid of it. */
-        ap_proxy_ssl_connection_cleanup(backend, r);
+        ap_proxy_ssl_connection_cleanup(ctx->p_conn, r);
     }
 
     /* Step One: Determine the URL to connect to (might be a proxy),
      * initialize the backend accordingly and determine the server 
      * port string we can expect in responses. */
-    if ((status = ap_proxy_determine_connection(p, r, conf, worker, backend,
+    if ((status = ap_proxy_determine_connection(p, r, conf, worker, ctx->p_conn,
                                                 uri, &locurl, proxyname,
-                                                proxyport, server_portstr,
-                                                sizeof(server_portstr))) != OK) {
+                                                proxyport, ctx->server_portstr,
+                                                sizeof(ctx->server_portstr))) != OK) {
         goto cleanup;
     }
     
-    /* Step Two: Make the Connection (or check that an already existing
-     * socket is still usable). On success, we have a socket connected to
-     * backend->hostname. */
-    if (ap_proxy_connect_backend(proxy_function, backend, worker, r->server)) {
-        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO()
-                      "H2: failed to make connection to backend: %s",
-                      backend->hostname);
-        status = HTTP_SERVICE_UNAVAILABLE;
-        goto cleanup;
-    }
+    hostname = (ctx->p_conn->ssl_hostname? 
+                ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
+    engine_type = apr_psprintf(p, "proxy_http2 %s%s", hostname, ctx->server_portstr);
     
-    /* Step Three: Create conn_rec for the socket we have open now. */
-    backconn = backend->connection;
-    if (!backconn) {
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO()
-                      "setup new connection: is_ssl=%d %s %s %s", 
-                      backend->is_ssl, 
-                      backend->ssl_hostname, r->hostname, backend->hostname);
-        if ((status = ap_proxy_connection_create(proxy_function, backend,
-                                                 c, r->server)) != OK) {
-            goto cleanup;
-        }
-        backconn = backend->connection;
-        
-        /*
-         * On SSL connections set a note on the connection what CN is
-         * requested, such that mod_ssl can check if it is requested to do
-         * so.
+    if (c->master && req_engine_push && is_h2 && is_h2(ctx->owner)) {
+        /* If we are have req_engine capabilities, push the handling of this
+         * request (e.g. slave connection) to a proxy_http2 engine which uses 
+         * the same backend. We may be called to create an engine ourself.
          */
-        if (backend->ssl_hostname) {
-            apr_table_setn(backend->connection->notes,
-                           "proxy-request-hostname", backend->ssl_hostname);
-        }
-        
-        if (backend->is_ssl) {
-            apr_table_setn(backend->connection->notes,
-                           "proxy-request-alpn-protos", "h2");
+        status = req_engine_push(engine_type, r, proxy_engine_init);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
+                      "H2: pushing request %s to engine type %s", 
+                      url, engine_type);
+        if (status == APR_SUCCESS && ctx->engine == NULL) {
+            /* Another engine instance has taken over processing of this
+             * request. */
+            goto cleanup;
         }
     }
-
-    /* Step Four: Send the Request in a new HTTP/2 stream and
-     * loop until we got the response or encounter errors.
-     */
-    if ((status = proxy_http2_process_stream(p, url, r, &backend, worker,
-                                             conf, server_portstr, 
-                                             flushall)) != OK) {
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO()
-                      "H2: failed to process request: %s",
-                      r->the_request);
+    
+    if (!ctx->engine) {
+        /* No engine was available or has been initialized, handle this
+        * request just by ourself. */
+        h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
+        engine->id = 0;
+        engine->type = engine_type;
+        engine->pool = p;
+        engine->c = c;
+        ctx->engine = engine;
+        ctx->standalone = 1;
     }
+    
+    status = proxy_engine_run(ctx, r);    
 
-    /* clean up before return */
 cleanup:
-    if (backend) {
-        if (status != OK) {
-            backend->close = 1;
+    if (ctx->engine && !ctx->standalone && req_engine_exit) {
+        req_engine_exit(ctx->engine);
+    }
+    ctx->engine = NULL;
+    
+    if (ctx) {
+        if (ctx->p_conn) {
+            if (status != OK) {
+                ctx->p_conn->close = 1;
+            }
+            proxy_run_detach_backend(r, ctx->p_conn);
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "cleanup, releasing connection");
+            ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
         }
-        proxy_run_detach_backend(r, backend);
-        proxy_http2_cleanup(proxy_function, r, backend);
+        ctx->worker = NULL;
+        ctx->conf = NULL;
+        ctx->p_conn = NULL;
     }
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, "leaving handler");
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
     return status;
 }