You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/03/02 12:21:29 UTC

svn commit: r1733259 [2/4] - in /httpd/httpd/branches/2.4.x: ./ modules/http2/

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.c Wed Mar  2 11:21:28 2016
@@ -33,23 +33,45 @@
 #include "h2_task.h"
 #include "h2_util.h"
 
-h2_io *h2_io_create(int id, apr_pool_t *pool)
+h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
 {
     h2_io *io = apr_pcalloc(pool, sizeof(*io));
     if (io) {
         io->id = id;
         io->pool = pool;
         io->bucket_alloc = apr_bucket_alloc_create(pool);
+        io->request = h2_request_clone(pool, request);
     }
     return io;
 }
 
-void h2_io_destroy(h2_io *io)
+void h2_io_redo(h2_io *io)
 {
-    if (io->pool) {
-        apr_pool_destroy(io->pool);
-        /* gone */
+    io->worker_started = 0;
+    io->response = NULL;
+    io->rst_error = 0;
+    if (io->bbin) {
+        apr_brigade_cleanup(io->bbin);
+    }
+    if (io->bbout) {
+        apr_brigade_cleanup(io->bbout);
+    }
+    if (io->tmp) {
+        apr_brigade_cleanup(io->tmp);
+    }
+    io->started_at = io->done_at = 0;
+}
+
+int h2_io_is_repeatable(h2_io *io) {
+    if (io->submitted
+        || io->input_consumed > 0 
+        || !io->request) {
+        /* cannot repeat that. */
+        return 0;
     }
+    return (!strcmp("GET", io->request->method)
+            || !strcmp("HEAD", io->request->method)
+            || !strcmp("OPTIONS", io->request->method));
 }
 
 void h2_io_set_response(h2_io *io, h2_response *response) 
@@ -75,6 +97,11 @@ int h2_io_in_has_eos_for(h2_io *io)
     return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
 }
 
+int h2_io_in_has_data(h2_io *io)
+{
+    return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
+}
+
 int h2_io_out_has_data(h2_io *io)
 {
     return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -102,12 +129,13 @@ apr_status_t h2_io_in_shutdown(h2_io *io
 }
 
 
-void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, apr_thread_cond_t *cond)
+void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 
+                       apr_thread_cond_t *cond)
 {
     io->timed_op = op;
     io->timed_cond = cond;
-    if (timeout_secs > 0) {
-        io->timeout_at = apr_time_now() + apr_time_from_sec(timeout_secs);
+    if (timeout > 0) {
+        io->timeout_at = apr_time_now() + timeout;
     }
     else {
         io->timeout_at = 0; 
@@ -255,6 +283,18 @@ apr_status_t h2_io_in_read(h2_io *io, ap
         }
     }
     
+    if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
+        if (io->eos_in) {
+            if (!io->eos_in_written) {
+                status = append_eos(io, bb, trailers);
+                io->eos_in_written = 1;
+            }
+        }
+    }
+    
+    if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
+        return APR_EAGAIN;
+    }
     return status;
 }
 
@@ -298,7 +338,7 @@ apr_status_t h2_io_out_readx(h2_io *io,
         return APR_ECONNABORTED;
     }
     
-    if (io->eos_out) {
+    if (io->eos_out_read) {
         *plen = 0;
         *peos = 1;
         return APR_SUCCESS;
@@ -316,7 +356,7 @@ apr_status_t h2_io_out_readx(h2_io *io,
     else {
         status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
         if (status == APR_SUCCESS) {
-            io->eos_out = *peos;
+            io->eos_out_read = *peos;
         }
     }
     
@@ -330,7 +370,7 @@ apr_status_t h2_io_out_read_to(h2_io *io
         return APR_ECONNABORTED;
     }
     
-    if (io->eos_out) {
+    if (io->eos_out_read) {
         *plen = 0;
         *peos = 1;
         return APR_SUCCESS;
@@ -341,7 +381,7 @@ apr_status_t h2_io_out_read_to(h2_io *io
         return APR_EAGAIN;
     }
 
-    io->eos_out = *peos = h2_util_has_eos(io->bbout, *plen);
+    io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
     return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
 }
 
@@ -413,14 +453,17 @@ apr_status_t h2_io_out_close(h2_io *io,
     if (io->rst_error) {
         return APR_ECONNABORTED;
     }
-    if (!io->eos_out) { /* EOS has not been read yet */
+    if (!io->eos_out_read) { /* EOS has not been read yet */
         process_trailers(io, trailers);
         if (!io->bbout) {
             io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
         }
-        if (!h2_util_has_eos(io->bbout, -1)) {
-            APR_BRIGADE_INSERT_TAIL(io->bbout, 
-                                    apr_bucket_eos_create(io->bucket_alloc));
+        if (!io->eos_out) {
+            io->eos_out = 1;
+            if (!h2_util_has_eos(io->bbout, -1)) {
+                APR_BRIGADE_INSERT_TAIL(io->bbout, 
+                                        apr_bucket_eos_create(io->bucket_alloc));
+            }
         }
     }
     return APR_SUCCESS;

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.h Wed Mar  2 11:21:28 2016
@@ -20,6 +20,7 @@ struct h2_response;
 struct apr_thread_cond_t;
 struct h2_mplx;
 struct h2_request;
+struct h2_task;
 
 
 typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
@@ -30,8 +31,7 @@ typedef enum {
     H2_IO_READ,
     H2_IO_WRITE,
     H2_IO_ANY,
-}
-h2_io_op;
+} h2_io_op;
 
 typedef struct h2_io h2_io;
 
@@ -51,15 +51,19 @@ struct h2_io {
     unsigned int orphaned       : 1; /* h2_stream is gone for this io */    
     unsigned int worker_started : 1; /* h2_worker started processing for this io */
     unsigned int worker_done    : 1; /* h2_worker finished for this io */
+    unsigned int submitted      : 1; /* response has been submitted to client */
     unsigned int request_body   : 1; /* iff request has body */
     unsigned int eos_in         : 1; /* input eos has been seen */
     unsigned int eos_in_written : 1; /* input eos has been forwarded */
-    unsigned int eos_out        : 1; /* output eos has been seen */
+    unsigned int eos_out        : 1; /* output eos is present */
+    unsigned int eos_out_read   : 1; /* output eos has been forwarded */
     
     h2_io_op timed_op;               /* which operation is waited on, if any */
     struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */
     apr_time_t timeout_at;           /* when IO wait will time out */
     
+    apr_time_t started_at;           /* when processing started */
+    apr_time_t done_at;              /* when processing was done */
     apr_size_t input_consumed;       /* how many bytes have been read */
         
     int files_handles_owned;
@@ -72,12 +76,7 @@ struct h2_io {
 /**
  * Creates a new h2_io for the given stream id. 
  */
-h2_io *h2_io_create(int id, apr_pool_t *pool);
-
-/**
- * Frees any resources hold by the h2_io instance. 
- */
-void h2_io_destroy(h2_io *io);
+h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
 
 /**
  * Set the response of this stream.
@@ -89,6 +88,9 @@ void h2_io_set_response(h2_io *io, struc
  */
 void h2_io_rst(h2_io *io, int error);
 
+int h2_io_is_repeatable(h2_io *io);
+void h2_io_redo(h2_io *io);
+
 /**
  * The input data is completely queued. Blocked reads will return immediately
  * and give either data or EOF.
@@ -98,9 +100,13 @@ int h2_io_in_has_eos_for(h2_io *io);
  * Output data is available.
  */
 int h2_io_out_has_data(h2_io *io);
+/**
+ * Input data is available.
+ */
+int h2_io_in_has_data(h2_io *io);
 
 void h2_io_signal(h2_io *io, h2_io_op op);
-void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, 
+void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 
                        struct apr_thread_cond_t *cond);
 void h2_io_signal_exit(h2_io *io);
 apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io);

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c Wed Mar  2 11:21:28 2016
@@ -45,16 +45,6 @@ h2_io_set *h2_io_set_create(apr_pool_t *
     return sp;
 }
 
-void h2_io_set_destroy(h2_io_set *sp)
-{
-    int i;
-    for (i = 0; i < sp->list->nelts; ++i) {
-        h2_io *io = h2_io_IDX(sp->list, i);
-        h2_io_destroy(io);
-    }
-    sp->list->nelts = 0;
-}
-
 static int h2_stream_id_cmp(const void *s1, const void *s2)
 {
     h2_io **pio1 = (h2_io **)s1;
@@ -91,7 +81,7 @@ apr_status_t h2_io_set_add(h2_io_set *sp
         int last;
         APR_ARRAY_PUSH(sp->list, h2_io*) = io;
         /* Normally, streams get added in ascending order if id. We
-         * keep the array sorted, so we just need to check of the newly
+         * keep the array sorted, so we just need to check if the newly
          * appended stream has a lower id than the last one. if not,
          * sorting is not necessary.
          */
@@ -111,9 +101,7 @@ static void remove_idx(h2_io_set *sp, in
     --sp->list->nelts;
     n = sp->list->nelts - idx;
     if (n > 0) {
-        /* Close the hole in the array by moving the upper
-         * parts down one step.
-         */
+        /* There are n h2_io* behind idx. Move the rest down */
         h2_io **selts = (h2_io**)sp->list->elts;
         memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
     }
@@ -124,7 +112,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h
     int i;
     for (i = 0; i < sp->list->nelts; ++i) {
         h2_io *e = h2_io_IDX(sp->list, i);
-        if (e == io) {
+        if (e->id == io->id) {
             remove_idx(sp, i);
             return e;
         }
@@ -132,7 +120,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h
     return NULL;
 }
 
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set)
+h2_io *h2_io_set_shift(h2_io_set *set)
 {
     /* For now, this just removes the first element in the set.
      * the name is misleading...

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h Wed Mar  2 11:21:28 2016
@@ -26,8 +26,6 @@ typedef struct h2_io_set h2_io_set;
 
 h2_io_set *h2_io_set_create(apr_pool_t *pool);
 
-void h2_io_set_destroy(h2_io_set *set);
-
 apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
 h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
 h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
@@ -48,9 +46,8 @@ typedef int h2_io_set_iter_fn(void *ctx,
  * @param ctx user data for the callback
  * @return 1 iff iteration completed for all members
  */
-int h2_io_set_iter(h2_io_set *set,
-                   h2_io_set_iter_fn *iter, void *ctx);
+int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx);
 
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
+h2_io *h2_io_set_shift(h2_io_set *set);
 
 #endif /* defined(__mod_h2__h2_io_set__) */

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Wed Mar  2 11:21:28 2016
@@ -17,7 +17,6 @@
 #include <stddef.h>
 #include <stdlib.h>
 
-#include <apr_atomic.h>
 #include <apr_thread_mutex.h>
 #include <apr_thread_cond.h>
 #include <apr_strings.h>
@@ -27,21 +26,23 @@
 #include <http_core.h>
 #include <http_log.h>
 
+#include "mod_http2.h"
+
 #include "h2_private.h"
 #include "h2_config.h"
 #include "h2_conn.h"
+#include "h2_ctx.h"
 #include "h2_h2.h"
+#include "h2_int_queue.h"
 #include "h2_io.h"
 #include "h2_io_set.h"
 #include "h2_response.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
 #include "h2_stream.h"
-#include "h2_stream_set.h"
 #include "h2_task.h"
 #include "h2_task_input.h"
 #include "h2_task_output.h"
-#include "h2_task_queue.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
@@ -60,6 +61,48 @@
     } while(0)
 
 
+/* NULL or the mutex hold by this thread, used for recursive calls
+ */
+static apr_threadkey_t *thread_lock;
+
+apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
+{
+    return apr_threadkey_private_create(&thread_lock, NULL, pool);
+}
+
+static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
+{
+    apr_status_t status;
+    void *mutex = NULL;
+    
+    /* Enter the mutex if this thread already holds the lock or
+     * if we can acquire it. Only on the later case do we unlock
+     * onleaving the mutex.
+     * This allow recursive entering of the mutex from the saem thread,
+     * which is what we need in certain situations involving callbacks
+     */
+    apr_threadkey_private_get(&mutex, thread_lock);
+    if (mutex == m->lock) {
+        *pacquired = 0;
+        return APR_SUCCESS;
+    }
+        
+    status = apr_thread_mutex_lock(m->lock);
+    *pacquired = (status == APR_SUCCESS);
+    if (*pacquired) {
+        apr_threadkey_private_set(m->lock, thread_lock);
+    }
+    return status;
+}
+
+static void leave_mutex(h2_mplx *m, int acquired)
+{
+    if (acquired) {
+        apr_threadkey_private_set(NULL, thread_lock);
+        apr_thread_mutex_unlock(m->lock);
+    }
+}
+
 static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
 {
     AP_DEBUG_ASSERT(m);
@@ -101,14 +144,6 @@ static void h2_mplx_destroy(h2_mplx *m)
                   "h2_mplx(%ld): destroy, ios=%d", 
                   m->id, (int)h2_io_set_size(m->stream_ios));
     m->aborted = 1;
-    if (m->ready_ios) {
-        h2_io_set_destroy(m->ready_ios);
-        m->ready_ios = NULL;
-    }
-    if (m->stream_ios) {
-        h2_io_set_destroy(m->stream_ios);
-        m->stream_ios = NULL;
-    }
     
     check_tx_free(m);
     
@@ -129,7 +164,8 @@ static void h2_mplx_destroy(h2_mplx *m)
  *   than protecting a shared h2_session one with an own lock.
  */
 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, 
-                        const h2_config *conf,
+                        const h2_config *conf, 
+                        apr_interval_time_t stream_timeout,
                         h2_workers *workers)
 {
     apr_status_t status = APR_SUCCESS;
@@ -151,6 +187,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         if (!m->pool) {
             return NULL;
         }
+        apr_pool_tag(m->pool, "h2_mplx");
         apr_allocator_owner_set(allocator, m->pool);
         
         status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
@@ -160,16 +197,26 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
             return NULL;
         }
         
-        m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+        status = apr_thread_cond_create(&m->task_done, m->pool);
+        if (status != APR_SUCCESS) {
+            h2_mplx_destroy(m);
+            return NULL;
+        }
+    
+        m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+        m->stream_timeout = stream_timeout;
         m->workers = workers;
+        m->workers_max = h2_config_geti(conf, H2_CONF_MAX_WORKERS);
+        m->workers_def_limit = 4;
+        m->workers_limit = m->workers_def_limit;
+        m->last_limit_change = m->last_idle_block = apr_time_now();
+        m->limit_change_interval = apr_time_from_msec(200);
         
         m->tx_handles_reserved = 0;
         m->tx_chunk_size = 4;
-        
-        m->stream_timeout_secs = h2_config_geti(conf, H2_CONF_STREAM_TIMEOUT_SECS);
     }
     return m;
 }
@@ -177,10 +224,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
 int h2_mplx_get_max_stream_started(h2_mplx *m)
 {
     int stream_id = 0;
+    int acquired;
     
-    apr_thread_mutex_lock(m->lock);
+    enter_mutex(m, &acquired);
     stream_id = m->max_stream_started;
-    apr_thread_mutex_unlock(m->lock);
+    leave_mutex(m, acquired);
     
     return stream_id;
 }
@@ -198,6 +246,7 @@ static void workers_register(h2_mplx *m)
      * Therefore: ref counting for h2_workers in not needed, ref counting
      * for h2_worker using this is critical.
      */
+    m->need_registration = 0;
     h2_workers_register(m->workers, m);
 }
 
@@ -231,7 +280,9 @@ static void io_destroy(h2_mplx *m, h2_io
 
     h2_io_set_remove(m->stream_ios, io);
     h2_io_set_remove(m->ready_ios, io);
-    h2_io_destroy(io);
+    if (m->redo_ios) {
+        h2_io_set_remove(m->redo_ios, io);
+    }
     
     if (pool) {
         apr_pool_clear(pool);
@@ -250,7 +301,7 @@ static int io_stream_done(h2_mplx *m, h2
     h2_io_set_remove(m->ready_ios, io);
     if (!io->worker_started || io->worker_done) {
         /* already finished or not even started yet */
-        h2_tq_remove(m->q, io->id);
+        h2_iq_remove(m->q, io->id);
         io_destroy(m, io, 1);
         return 0;
     }
@@ -266,27 +317,64 @@ static int stream_done_iter(void *ctx, h
     return io_stream_done((h2_mplx*)ctx, io, 0);
 }
 
+static int stream_print(void *ctx, h2_io *io)
+{
+    h2_mplx *m = ctx;
+    if (io && io->request) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+                      "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
+                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
+                      m->id, io->id, 
+                      io->request->method, io->request->authority, io->request->path,
+                      io->response? "http" : (io->rst_error? "reset" : "?"),
+                      io->response? io->response->http_status : io->rst_error,
+                      io->orphaned, io->worker_started, io->worker_done,
+                      io->eos_in, io->eos_out);
+    }
+    else if (io) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+                      "->03198: h2_stream(%ld-%d): NULL -> %s %d"
+                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
+                      m->id, io->id, 
+                      io->response? "http" : (io->rst_error? "reset" : "?"),
+                      io->response? io->response->http_status : io->rst_error,
+                      io->orphaned, io->worker_started, io->worker_done,
+                      io->eos_in, io->eos_out);
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+                      "->03198: h2_stream(%ld-NULL): NULL", m->id);
+    }
+    return 1;
+}
+
 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
-    
+    int acquired;
+
     h2_workers_unregister(m->workers, m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int i, wait_secs = 5;
         
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
+        h2_iq_clear(m->q);
+        apr_thread_cond_broadcast(m->task_done);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
     
-        /* Any remaining ios have handed out requests to workers that are
-         * not done yet. Any operation they do on their assigned stream ios will
-         * be errored ECONNRESET/ABORTED, so that should find out pretty soon.
+        /* If we still have busy workers, we cannot release our memory
+         * pool yet, as slave connections have child pools of their respective
+         * h2_io's.
+         * Any remaining ios are processed in these workers. Any operation 
+         * they do on their input/outputs will be errored ECONNRESET/ABORTED, 
+         * so processing them should fail and workers *should* return.
          */
-        for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
+        for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
@@ -302,14 +390,20 @@ apr_status_t h2_mplx_release_and_join(h2
                      */
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "all h2_workers to return, have still %d requests outstanding", 
-                                  m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+                                  "%d h2_workers to return, have still %d requests outstanding", 
+                                  m->id, i*wait_secs, m->workers_busy,
+                                  (int)h2_io_set_size(m->stream_ios));
+                    if (i == 1) {
+                        h2_io_set_iter(m->stream_ios, stream_print, m);
+                    }
                 }
+                m->aborted = 1;
+                apr_thread_cond_broadcast(m->task_done);
             }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                       "h2_mplx(%ld): release_join -> destroy", m->id);
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
         h2_mplx_destroy(m);
         /* all gone */
     }
@@ -319,88 +413,43 @@ apr_status_t h2_mplx_release_and_join(h2
 void h2_mplx_abort(h2_mplx *m)
 {
     apr_status_t status;
+    int acquired;
     
     AP_DEBUG_ASSERT(m);
     if (!m->aborted) {
-        status = apr_thread_mutex_lock(m->lock);
-        if (APR_SUCCESS == status) {
+        if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
             m->aborted = 1;
-            apr_thread_mutex_unlock(m->lock);
+            leave_mutex(m, acquired);
         }
     }
 }
 
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
 {
-    apr_status_t status;
+    apr_status_t status = APR_SUCCESS;
+    int acquired;
     
+    /* This maybe called from inside callbacks that already hold the lock.
+     * E.g. when we are streaming out DATA and the EOF triggers the stream
+     * release.
+     */
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
 
         /* there should be an h2_io, once the stream has been scheduled
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
         if (io) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, 
+                          "h2_mplx(%ld-%d): marking stream as done.", 
+                          m->id, stream_id);
             io_stream_done(m, io, rst_error);
         }
-        
-        apr_thread_mutex_unlock(m->lock);
-    }
-    return status;
-}
 
-static const h2_request *pop_request(h2_mplx *m)
-{
-    const h2_request *req = NULL;
-    int sid;
-    while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
-        h2_io *io = h2_io_set_get(m->stream_ios, sid);
-        if (io) {
-            req = io->request;
-            io->worker_started = 1;
-            if (sid > m->max_stream_started) {
-                m->max_stream_started = sid;
-            }
-        }
-    }
-    return req;
-}
-
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
-{
-    h2_mplx *m = *pm;
-    
-    apr_status_t status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                      "h2_mplx(%ld): request(%d) done", m->id, stream_id);
-        if (io) {
-            io->worker_done = 1;
-            if (io->orphaned) {
-                io_destroy(m, io, 0);
-                if (m->join_wait) {
-                    apr_thread_cond_signal(m->join_wait);
-                }
-            }
-            else {
-                /* hang around until the stream deregisteres */
-            }
-        }
-        
-        if (preq) {
-            /* someone wants another request, if we have */
-            *preq = pop_request(m);
-        }
-        if (!preq || !*preq) {
-            /* No request to hand back to the worker, NULLify reference
-             * and decrement count */
-            *pm = NULL;
-        }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
+    return status;
 }
 
 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
@@ -409,14 +458,15 @@ apr_status_t h2_mplx_in_read(h2_mplx *m,
                              struct apr_thread_cond_t *iowait)
 {
     apr_status_t status; 
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
             
-            h2_io_signal_init(io, H2_IO_READ, m->stream_timeout_secs, iowait);
+            h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
             status = h2_io_in_read(io, bb, -1, trailers);
             while (APR_STATUS_IS_EAGAIN(status) 
                    && !is_aborted(m, &status)
@@ -435,7 +485,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m,
         else {
             status = APR_EOF;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -444,9 +494,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
                               apr_bucket_brigade *bb)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
@@ -458,7 +509,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
         else {
             status = APR_ECONNABORTED;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -466,9 +517,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             status = h2_io_in_close(io);
@@ -479,7 +531,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
         else {
             status = APR_ECONNABORTED;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -507,12 +559,13 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
 apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
     if (m->aborted) {
         return APR_ECONNABORTED;
     }
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         update_ctx ctx;
         
         ctx.m               = m;
@@ -524,7 +577,7 @@ apr_status_t h2_mplx_in_update_windows(h
         if (ctx.streams_updated) {
             status = APR_SUCCESS;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -535,9 +588,10 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
                                apr_table_t **ptrailers)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
@@ -553,7 +607,7 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
         }
         
         *ptrailers = (*peos && io->response)? io->response->trailers : NULL;
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -564,9 +618,10 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
                                  apr_table_t **ptrailers)
 {
     apr_status_t status;
+    int acquired;
+
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
@@ -582,23 +637,24 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
             status = APR_ECONNABORTED;
         }
         *ptrailers = (*peos && io->response)? io->response->trailers : NULL;
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
 
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
 {
     apr_status_t status;
     h2_stream *stream = NULL;
+    int acquired;
 
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
-        h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_shift(m->ready_ios);
         if (io && !m->aborted) {
-            stream = h2_stream_set_get(streams, io->id);
+            stream = h2_ihash_get(streams, io->id);
             if (stream) {
+                io->submitted = 1;
                 if (io->rst_error) {
                     h2_stream_rst(stream, io->rst_error);
                 }
@@ -614,7 +670,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
                  * reset by the client. Should no longer happen since such
                  * streams should clear io's from the ready queue.
                  */
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,  
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
                               "h2_mplx(%ld): stream for response %d closed, "
                               "resetting io to close request processing",
                               m->id, io->id);
@@ -633,7 +689,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
             
             h2_io_signal(io, H2_IO_WRITE);
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return stream;
 }
@@ -657,7 +713,7 @@ static apr_status_t out_write(h2_mplx *m
                                  &m->tx_handles_reserved);
         /* Wait for data to drain until there is room again or
          * stream timeout expires */
-        h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout_secs, iowait);
+        h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
         while (status == APR_SUCCESS
                && !APR_BRIGADE_EMPTY(bb) 
                && iowait
@@ -716,9 +772,10 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
                               struct apr_thread_cond_t *iowait)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
@@ -728,7 +785,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
                 h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
             }
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -739,13 +796,14 @@ apr_status_t h2_mplx_out_write(h2_mplx *
                                struct apr_thread_cond_t *iowait)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             status = out_write(m, io, f, bb, trailers, iowait);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
                           "h2_mplx(%ld-%d): write with trailers=%s", 
                           m->id, io->id, trailers? "yes" : "no");
             H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
@@ -755,7 +813,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *
         else {
             status = APR_ECONNABORTED;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -763,9 +821,10 @@ apr_status_t h2_mplx_out_write(h2_mplx *
 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             if (!io->response && !io->rst_error) {
@@ -791,7 +850,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *
         else {
             status = APR_ECONNABORTED;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -799,9 +858,10 @@ apr_status_t h2_mplx_out_close(h2_mplx *
 apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->rst_error && !io->orphaned) {
             h2_io_rst(io, error);
@@ -816,7 +876,7 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
         else {
             status = APR_ECONNABORTED;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -824,10 +884,11 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
 int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
 {
     int has_eos = 0;
+    int acquired;
+    
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             has_eos = h2_io_in_has_eos_for(io);
@@ -835,18 +896,39 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, i
         else {
             has_eos = 1;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return has_eos;
 }
 
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
+{
+    apr_status_t status;
+    int has_data = 0;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            has_data = h2_io_in_has_data(io);
+        }
+        else {
+            has_data = 0;
+        }
+        leave_mutex(m, acquired);
+    }
+    return has_data;
+}
+
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
     int has_data = 0;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             has_data = h2_io_out_has_data(io);
@@ -854,7 +936,7 @@ int h2_mplx_out_has_data_for(h2_mplx *m,
         else {
             has_data = 0;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return has_data;
 }
@@ -863,9 +945,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
                                  apr_thread_cond_t *iowait)
 {
     apr_status_t status;
+    int acquired;
+    
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
@@ -879,7 +962,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
             }
             m->added_output = NULL;
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
@@ -896,37 +979,38 @@ static void have_out_data_for(h2_mplx *m
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
+    int acquired;
     
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
         else {
-            h2_tq_sort(m->q, cmp, ctx);
+            h2_iq_sort(m->q, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): reprioritize tasks", m->id);
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
     return status;
 }
 
-static h2_io *open_io(h2_mplx *m, int stream_id)
+static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
 {
     apr_pool_t *io_pool = m->spare_pool;
     h2_io *io;
     
     if (!io_pool) {
         apr_pool_create(&io_pool, m->pool);
+        apr_pool_tag(io_pool, "h2_io");
     }
     else {
         m->spare_pool = NULL;
     }
     
-    io = h2_io_create(stream_id, io_pool);
+    io = h2_io_create(stream_id, io_pool, request);
     h2_io_set_add(m->stream_ios, io);
     
     return io;
@@ -937,55 +1021,615 @@ apr_status_t h2_mplx_process(h2_mplx *m,
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
-    int was_empty = 0;
+    int do_registration = 0;
+    int acquired;
     
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
         else {
-            h2_io *io = open_io(m, stream_id);
-            io->request = req;
+            h2_io *io = open_io(m, stream_id, req);
             
             if (!io->request->body) {
                 status = h2_io_in_close(io);
             }
             
-            was_empty = h2_tq_empty(m->q);
-            h2_tq_add(m->q, io->id, cmp, ctx);
+            m->need_registration = m->need_registration || h2_iq_empty(m->q);
+            do_registration = (m->need_registration && m->workers_busy < m->workers_max);
+            h2_iq_add(m->q, io->id, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
                           "h2_mplx(%ld-%d): process", m->c->id, stream_id);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
         }
-        apr_thread_mutex_unlock(m->lock);
+        leave_mutex(m, acquired);
     }
-    if (status == APR_SUCCESS && was_empty) {
+    if (status == APR_SUCCESS && do_registration) {
         workers_register(m);
     }
     return status;
 }
 
-const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
+static h2_task *pop_task(h2_mplx *m)
 {
-    const h2_request *req = NULL;
+    h2_task *task = NULL;
+    int sid;
+    while (!m->aborted && !task 
+        && (m->workers_busy < m->workers_limit)
+        && (sid = h2_iq_shift(m->q)) > 0) {
+        h2_io *io = h2_io_set_get(m->stream_ios, sid);
+        if (io && io->orphaned) {
+            io_destroy(m, io, 0);
+            if (m->join_wait) {
+                apr_thread_cond_signal(m->join_wait);
+            }
+        }
+        else if (io) {
+            conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
+            m->spare_allocator = NULL;
+            task = h2_task_create(m->id, io->request, slave, m);
+            io->worker_started = 1;
+            io->started_at = apr_time_now();
+            if (sid > m->max_stream_started) {
+                m->max_stream_started = sid;
+            }
+            ++m->workers_busy;
+        }
+    }
+    return task;
+}
+
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+{
+    h2_task *task = NULL;
     apr_status_t status;
+    int acquired;
     
     AP_DEBUG_ASSERT(m);
-    status = apr_thread_mutex_lock(m->lock);
-    if (APR_SUCCESS == status) {
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         if (m->aborted) {
-            req = NULL;
             *has_more = 0;
         }
         else {
-            req = pop_request(m);
-            *has_more = !h2_tq_empty(m->q);
+            task = pop_task(m);
+            *has_more = !h2_iq_empty(m->q);
+        }
+        
+        if (has_more && !task) {
+            m->need_registration = 1;
+        }
+        leave_mutex(m, acquired);
+    }
+    return task;
+}
+
+static void task_done(h2_mplx *m, h2_task *task)
+{
+    if (task) {
+        if (task->frozen) {
+            /* this task was handed over to an engine for processing */
+            h2_task_thaw(task);
+            /* TODO: can we signal an engine that it can now start on this? */
+        }
+        else {
+            h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+            
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): task(%s) done", m->id, task->id);
+            /* clean our references and report request as done. Signal
+             * that we want another unless we have been aborted */
+            /* TODO: this will keep a worker attached to this h2_mplx as
+             * long as it has requests to handle. Might no be fair to
+             * other mplx's. Perhaps leave after n requests? */
+            h2_mplx_out_close(m, task->stream_id, NULL);
+            if (m->spare_allocator) {
+                apr_allocator_destroy(m->spare_allocator);
+                m->spare_allocator = NULL;
+            }
+            h2_slave_destroy(task->c, &m->spare_allocator);
+            task = NULL;
+            if (io) {
+                apr_time_t now = apr_time_now();
+                if (!io->orphaned && m->redo_ios
+                    && h2_io_set_get(m->redo_ios, io->id)) {
+                    /* reset and schedule again */
+                    h2_io_redo(io);
+                    h2_io_set_remove(m->redo_ios, io);
+                    h2_iq_add(m->q, io->id, NULL, NULL);
+                }
+                else {
+                    io->worker_done = 1;
+                    io->done_at = now;
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                                  "h2_mplx(%ld): request(%d) done, %f ms"
+                                  " elapsed", m->id, io->id, 
+                                  (io->done_at - io->started_at) / 1000.0);
+                    if (io->started_at > m->last_idle_block) {
+                        /* this task finished without causing an 'idle block', e.g.
+                         * a block by flow control.
+                         */
+                        if (now - m->last_limit_change >= m->limit_change_interval
+                            && m->workers_limit < m->workers_max) {
+                            /* Well behaving stream, allow it more workers */
+                            m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                                     m->workers_max);
+                            m->last_limit_change = now;
+                            m->need_registration = 1;
+                            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                                          "h2_mplx(%ld): increase worker limit to %d",
+                                          m->id, m->workers_limit);
+                        }
+                    }
+                }
+                
+                if (io->orphaned) {
+                    io_destroy(m, io, 0);
+                    if (m->join_wait) {
+                        apr_thread_cond_signal(m->join_wait);
+                    }
+                }
+                else {
+                    /* hang around until the stream deregisteres */
+                }
+            }
+            apr_thread_cond_broadcast(m->task_done);
+        }
+    }
+}
+
+void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
+{
+    int acquired;
+    
+    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+        task_done(m, task);
+        --m->workers_busy;
+        if (ptask) {
+            /* caller wants another task */
+            *ptask = pop_task(m);
+        }
+        leave_mutex(m, acquired);
+    }
+}
+
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+typedef struct {
+    h2_mplx *m;
+    h2_io *io;
+    apr_time_t now;
+} io_iter_ctx;
+
+static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+{
+    io_iter_ctx *ctx = data;
+    if (io->worker_started && !io->worker_done
+        && h2_io_is_repeatable(io)
+        && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+        /* this io occupies a worker, the response has not been submitted yet,
+         * not been cancelled and it is a repeatable request
+         * -> it can be re-scheduled later */
+        if (!ctx->io || ctx->io->started_at < io->started_at) {
+            /* we did not have one or this one was started later */
+            ctx->io = io;
+        }
+    }
+    return 1;
+}
+
+static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) 
+{
+    io_iter_ctx ctx;
+    ctx.m = m;
+    ctx.io = NULL;
+    h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+    return ctx.io;
+}
+
+static int timed_out_busy_iter(void *data, h2_io *io)
+{
+    io_iter_ctx *ctx = data;
+    if (io->worker_started && !io->worker_done
+        && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+        /* timed out stream occupying a worker, found */
+        ctx->io = io;
+        return 0;
+    }
+    return 1;
+}
+static h2_io *get_timed_out_busy_stream(h2_mplx *m) 
+{
+    io_iter_ctx ctx;
+    ctx.m = m;
+    ctx.io = NULL;
+    ctx.now = apr_time_now();
+    h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+    return ctx.io;
+}
+
+static apr_status_t unschedule_slow_ios(h2_mplx *m) 
+{
+    h2_io *io;
+    int n;
+    
+    if (!m->redo_ios) {
+        m->redo_ios = h2_io_set_create(m->pool);
+    }
+    /* Try to get rid of streams that occupy workers. Look for safe requests
+     * that are repeatable. If none found, fail the connection.
+     */
+    n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+    while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
+        h2_io_set_add(m->redo_ios, io);
+        h2_io_rst(io, H2_ERR_CANCEL);
+        --n;
+    }
+    
+    if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+        io = get_timed_out_busy_stream(m);
+        if (io) {
+            /* Too many busy workers, unable to cancel enough streams
+             * and with a busy, timed out stream, we tell the client
+             * to go away... */
+            return APR_TIMEUP;
+        }
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t h2_mplx_idle(h2_mplx *m)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_time_t now;            
+    int acquired;
+    
+    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+        apr_size_t scount = h2_io_set_size(m->stream_ios);
+        if (scount > 0 && m->workers_busy) {
+            /* If we have streams in connection state 'IDLE', meaning
+             * all streams are ready to sent data out, but lack
+             * WINDOW_UPDATEs. 
+             * 
+             * This is ok, unless we have streams that still occupy
+             * h2 workers. As worker threads are a scarce resource, 
+             * we need to take measures that we do not get DoSed.
+             * 
+             * This is what we call an 'idle block'. Limit the amount 
+             * of busy workers we allow for this connection until it
+             * well behaves.
+             */
+            now = apr_time_now();
+            m->last_idle_block = now;
+            if (m->workers_limit > 2 
+                && now - m->last_limit_change >= m->limit_change_interval) {
+                if (m->workers_limit > 16) {
+                    m->workers_limit = 16;
+                }
+                else if (m->workers_limit > 8) {
+                    m->workers_limit = 8;
+                }
+                else if (m->workers_limit > 4) {
+                    m->workers_limit = 4;
+                }
+                else if (m->workers_limit > 2) {
+                    m->workers_limit = 2;
+                }
+                m->last_limit_change = now;
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                              "h2_mplx(%ld): decrease worker limit to %d",
+                              m->id, m->workers_limit);
+            }
+            
+            if (m->workers_busy > m->workers_limit) {
+                status = unschedule_slow_ios(m);
+            }
+        }
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+
+typedef struct h2_req_entry h2_req_entry;
+struct h2_req_entry {
+    APR_RING_ENTRY(h2_req_entry) link;
+    request_rec *r;
+};
+
+#define H2_REQ_ENTRY_NEXT(e)	APR_RING_NEXT((e), link)
+#define H2_REQ_ENTRY_PREV(e)	APR_RING_PREV((e), link)
+#define H2_REQ_ENTRY_REMOVE(e)	APR_RING_REMOVE((e), link)
+
+typedef struct h2_req_engine_i h2_req_engine_i;
+struct h2_req_engine_i {
+    h2_req_engine pub;
+    conn_rec *c;               /* connection this engine is assigned to */
+    h2_mplx *m;
+    unsigned int shutdown : 1; /* engine is being shut down */
+    apr_thread_cond_t *io;     /* condition var for waiting on data */
+    APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
+    apr_size_t no_assigned;    /* # of assigned requests */
+    apr_size_t no_live;        /* # of live */
+    apr_size_t no_finished;    /* # of finished */
+};
+
+#define H2_REQ_ENTRIES_SENTINEL(b)	APR_RING_SENTINEL((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b)	APR_RING_EMPTY((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b)	APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b)	APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do {				\
+h2_req_entry *ap__b = (e);                                        \
+APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link);	\
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do {				\
+h2_req_entry *ap__b = (e);					\
+APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link);	\
+} while (0)
+
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, 
+                                            h2_req_engine_i *engine, 
+                                            request_rec *r)
+{
+    h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
+
+    APR_RING_ELEM_INIT(entry, link);
+    entry->r = r;
+    H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
+    return APR_SUCCESS;
+}
+
+
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
+                                 request_rec *r, h2_mplx_engine_init *einit)
+{
+    apr_status_t status;
+    h2_mplx *m;
+    h2_task *task;
+    int acquired;
+    
+    task = h2_ctx_rget_task(r);
+    if (!task) {
+        return APR_ECONNABORTED;
+    }
+    m = task->mplx;
+    AP_DEBUG_ASSERT(m);
+    
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+        if (!io || io->orphaned) {
+            status = APR_ECONNABORTED;
+        }
+        else {
+            h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
+            
+            apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
+            status = APR_EOF;
+            
+            if (task->ser_headers) {
+                /* Max compatibility, deny processing of this */
+            }
+            else if (engine && !strcmp(engine->pub.type, engine_type)) {
+                if (engine->shutdown 
+                    || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                                  "h2_mplx(%ld): engine shutdown or over %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
+                }
+                else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
+                    /* this task will be processed in another thread,
+                     * freeze any I/O for the time being. */
+                    h2_task_freeze(task, r);
+                    engine->no_assigned++;
+                    status = APR_SUCCESS;
+                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
+                                  "h2_mplx(%ld): push request %s", 
+                                  m->c->id, r->the_request);
+                }
+                else {
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                                  "h2_mplx(%ld): engine error adding req %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
+                }
+            }
+            
+            if (!engine && einit) {
+                engine = apr_pcalloc(task->c->pool, sizeof(*engine));
+                engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", 
+                                               m->id, m->next_eng_id++);
+                engine->pub.pool = task->c->pool;
+                engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
+                engine->pub.window_bits = 30;
+                engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
+                engine->c = r->connection;
+                APR_RING_INIT(&engine->entries, h2_req_entry, link);
+                engine->m = m;
+                engine->io = task->io;
+                engine->no_assigned = 1;
+                engine->no_live = 1;
+                
+                status = einit(&engine->pub, r);
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                              "h2_mplx(%ld): init engine %s (%s)", 
+                              m->c->id, engine->pub.id, engine->pub.type);
+                if (status == APR_SUCCESS) {
+                    m->engine = &engine->pub;
+                }
+            }
+        }
+        
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+
+static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
+{
+    h2_req_entry *entry;
+    h2_task *task;
+
+    for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+         entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+         entry = H2_REQ_ENTRY_NEXT(entry)) {
+        task = h2_ctx_rget_task(entry->r);
+        AP_DEBUG_ASSERT(task);
+        if (!task->frozen) {
+            H2_REQ_ENTRY_REMOVE(entry);
+            return entry;
         }
-        apr_thread_mutex_unlock(m->lock);
     }
-    return req;
+    return NULL;
 }
 
+static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, 
+                                apr_read_type_e block, request_rec **pr)
+{   
+    h2_req_entry *entry;
+    
+    AP_DEBUG_ASSERT(m);
+    AP_DEBUG_ASSERT(engine);
+    while (1) {
+        if (m->aborted) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): mplx abort while pulling requests %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        
+        if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) 
+            && (entry = pop_non_frozen(engine))) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
+                          "h2_mplx(%ld): request %s pulled by engine %s", 
+                          m->c->id, entry->r->the_request, engine->pub.id);
+            engine->no_live++;
+            entry->r->connection->current_thread = engine->c->current_thread;
+            *pr = entry->r;
+            return APR_SUCCESS;
+        }
+        else if (APR_NONBLOCK_READ == block) {
+            *pr = NULL;
+            return APR_EAGAIN;
+        }
+        else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+            engine->shutdown = 1;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): emtpy queue, shutdown engine %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        apr_thread_cond_timedwait(m->task_done, m->lock, 
+                                  apr_time_from_msec(100));
+    }
+}
+                                 
+apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, 
+                                 apr_read_type_e block, request_rec **pr)
+{   
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
+    apr_status_t status;
+    int acquired;
+    
+    *pr = NULL;
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        status = engine_pull(m, engine, block, pr);
+        leave_mutex(m, acquired);
+    }
+    return status;
+}
+ 
+static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, 
+                        int waslive, int aborted)
+{
+    int acquired;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                  "h2_mplx(%ld): task %s %s by %s", 
+                  m->id, task->id, aborted? "aborted":"done", 
+                  engine->pub.id);
+    h2_task_output_close(task->output);
+    engine->no_finished++;
+    if (waslive) engine->no_live--;
+    engine->no_assigned--;
+    if (task->c != engine->c) { /* do not release what the engine runs on */
+        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+            task_done(m, task);
+            leave_mutex(m, acquired);
+        }
+    }
+}
+                                
+void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
+{
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
+    h2_task *task;
+    int acquired;
+
+    task = h2_ctx_cget_task(r_conn);
+    if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
+        engine_done(m, engine, task, 1, 0);
+        leave_mutex(m, acquired);
+    }
+}
+                                
+void h2_mplx_engine_exit(h2_req_engine *pub_engine)
+{
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
+    int acquired;
+    
+    if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+        if (!m->aborted 
+            && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+            h2_req_entry *entry;
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "has still requests queued, shutdown=%d,"
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          engine->shutdown, 
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+            for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+                 entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+                 entry = H2_REQ_ENTRY_NEXT(entry)) {
+                request_rec *r = entry->r;
+                h2_task *task = h2_ctx_rget_task(r);
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): engine %s has queued task %s, "
+                              "frozen=%d, aborting",
+                              m->c->id, engine->pub.id, task->id, task->frozen);
+                engine_done(m, engine, task, 0, 1);
+            }
+        }
+        if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+        }
+        else {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s)", 
+                          m->c->id, engine->pub.id, engine->pub.type);
+        }
+        if (m->engine == &engine->pub) {
+            m->engine = NULL; /* TODO */
+        }
+        leave_mutex(m, acquired);
+    }
+}

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Wed Mar  2 11:21:28 2016
@@ -38,6 +38,7 @@ struct apr_pool_t;
 struct apr_thread_mutex_t;
 struct apr_thread_cond_t;
 struct h2_config;
+struct h2_ihash_t;
 struct h2_response;
 struct h2_task;
 struct h2_stream;
@@ -45,9 +46,10 @@ struct h2_request;
 struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
-struct h2_stream_set;
-struct h2_task_queue;
+struct h2_int_queue;
+struct h2_req_engine;
 
+#include <apr_queue.h>
 #include "h2_io.h"
 
 typedef struct h2_mplx h2_mplx;
@@ -66,27 +68,45 @@ struct h2_mplx {
     apr_pool_t *pool;
 
     unsigned int aborted : 1;
+    unsigned int need_registration : 1;
 
-    struct h2_task_queue *q;
+    struct h2_int_queue *q;
     struct h2_io_set *stream_ios;
     struct h2_io_set *ready_ios;
+    struct h2_io_set *redo_ios;
     
     int max_stream_started;      /* highest stream id that started processing */
+    int workers_busy;            /* # of workers processing on this mplx */
+    int workers_limit;           /* current # of workers limit, dynamic */
+    int workers_def_limit;       /* default # of workers limit */
+    int workers_max;             /* max, hard limit # of workers in a process */
+    apr_time_t last_idle_block;  /* last time, this mplx entered IDLE while
+                                  * streams were ready */
+    apr_time_t last_limit_change;/* last time, worker limit changed */
+    apr_interval_time_t limit_change_interval;
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
+    struct apr_thread_cond_t *task_done;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
-    int stream_timeout_secs;
+    apr_interval_time_t stream_timeout;
     
     apr_pool_t *spare_pool;           /* spare pool, ready for next io */
+    apr_allocator_t *spare_allocator;
+    
     struct h2_workers *workers;
     apr_size_t tx_handles_reserved;
     apr_size_t tx_chunk_size;
     
     h2_mplx_consumed_cb *input_consumed;
     void *input_consumed_ctx;
+    
+    struct h2_req_engine *engine;
+    /* TODO: signal for waiting tasks*/
+    apr_queue_t *engine_queue;
+    int next_eng_id;
 };
 
 
@@ -95,12 +115,15 @@ struct h2_mplx {
  * Object lifecycle and information.
  ******************************************************************************/
 
+apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s);
+
 /**
  * Create the multiplexer for the given HTTP2 session. 
  * Implicitly has reference count 1.
  */
 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master, 
                         const struct h2_config *conf, 
+                        apr_interval_time_t stream_timeout,
                         struct h2_workers *workers);
 
 /**
@@ -119,7 +142,9 @@ apr_status_t h2_mplx_release_and_join(h2
  */
 void h2_mplx_abort(h2_mplx *mplx);
 
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
+
+void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask);
 
 /**
  * Get the highest stream identifier that has been passed on to processing.
@@ -143,10 +168,14 @@ int h2_mplx_get_max_stream_started(h2_mp
  */
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
-/* Return != 0 iff the multiplexer has data for the given stream. 
+/* Return != 0 iff the multiplexer has output data for the given stream. 
  */
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
 
+/* Return != 0 iff the multiplexer has input data for the given stream. 
+ */
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
+
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -179,8 +208,6 @@ apr_status_t h2_mplx_process(h2_mplx *m,
  */
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
 
-const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
-
 /**
  * Register a callback for the amount of input data consumed per stream. The
  * will only ever be invoked from the thread creating this h2_mplx, e.g. when
@@ -248,7 +275,7 @@ apr_status_t h2_mplx_in_update_windows(h
  * @param bb the brigade to place any existing repsonse body data into
  */
 struct h2_stream *h2_mplx_next_submit(h2_mplx *m, 
-                                      struct h2_stream_set *streams);
+                                      struct h2_ihash_t *streams);
 
 /**
  * Reads output data from the given stream. Will never block, but
@@ -369,5 +396,32 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx
  */
 #define H2_MPLX_REMOVE(e)	APR_RING_REMOVE((e), link)
 
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+/**
+ * Master connection has entered idle mode.
+ * @param m the mplx instance of the master connection
+ * @return != SUCCESS iff connection should be terminated
+ */
+apr_status_t h2_mplx_idle(h2_mplx *m);
+
+/*******************************************************************************
+ * h2_mplx h2_req_engine handling.
+ ******************************************************************************/
+ 
+typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, 
+                                         request_rec *r);
+
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
+                                 request_rec *r, h2_mplx_engine_init *einit);
+                                 
+apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, 
+                                 apr_read_type_e block, request_rec **pr);
+
+void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
+                                 
+void h2_mplx_engine_exit(struct h2_req_engine *engine);
 
 #endif /* defined(__mod_h2__h2_mplx__) */

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_private.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_private.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_private.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_private.h Wed Mar  2 11:21:28 2016
@@ -16,26 +16,12 @@
 #ifndef mod_h2_h2_private_h
 #define mod_h2_h2_private_h
 
+#include <apr_time.h>
+
 #include <nghttp2/nghttp2.h>
 
 extern module AP_MODULE_DECLARE_DATA http2_module;
 
 APLOG_USE_MODULE(http2);
 
-
-#define H2_HEADER_METHOD     ":method"
-#define H2_HEADER_METHOD_LEN 7
-#define H2_HEADER_SCHEME     ":scheme"
-#define H2_HEADER_SCHEME_LEN 7
-#define H2_HEADER_AUTH       ":authority"
-#define H2_HEADER_AUTH_LEN   10
-#define H2_HEADER_PATH       ":path"
-#define H2_HEADER_PATH_LEN   5
-#define H2_CRLF             "\r\n"
-
-#define H2_ALEN(a)          (sizeof(a)/sizeof((a)[0]))
-
-#define H2MAX(x,y) ((x) > (y) ? (x) : (y))
-#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
-
 #endif

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_push.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.c Wed Mar  2 11:21:28 2016
@@ -276,20 +276,49 @@ static int same_authority(const h2_reque
     return 1;
 }
 
-static int set_header(void *ctx, const char *key, const char *value) 
+static int set_push_header(void *ctx, const char *key, const char *value) 
 {
-    apr_table_setn(ctx, key, value);
+    size_t klen = strlen(key);
+    if (H2_HD_MATCH_LIT("User-Agent", key, klen)
+        || H2_HD_MATCH_LIT("Accept", key, klen)
+        || H2_HD_MATCH_LIT("Accept-Encoding", key, klen)
+        || H2_HD_MATCH_LIT("Accept-Language", key, klen)
+        || H2_HD_MATCH_LIT("Cache-Control", key, klen)) {
+        apr_table_setn(ctx, key, value);
+    }
     return 1;
 }
 
+static int has_param(link_ctx *ctx, const char *param)
+{
+    const char *p = apr_table_get(ctx->params, param);
+    return !!p;
+}
+
+static int has_relation(link_ctx *ctx, const char *rel)
+{
+    const char *s, *val = apr_table_get(ctx->params, "rel");
+    if (val) {
+        if (!strcmp(rel, val)) {
+            return 1;
+        }
+        s = ap_strstr_c(val, rel);
+        if (s && (s == val || s[-1] == ' ')) {
+            s += strlen(rel);
+            if (!*s || *s == ' ') {
+                return 1;
+            }
+        }
+    }
+    return 0;
+}
 
 static int add_push(link_ctx *ctx)
 {
     /* so, we have read a Link header and need to decide
      * if we transform it into a push.
      */
-    const char *rel = apr_table_get(ctx->params, "rel");
-    if (rel && !strcmp("preload", rel)) {
+    if (has_relation(ctx, "preload") && !has_param(ctx, "nopush")) {
         apr_uri_t uri;
         if (apr_uri_parse(ctx->pool, ctx->link, &uri) == APR_SUCCESS) {
             if (uri.path && same_authority(ctx->req, &uri)) {
@@ -306,9 +335,7 @@ static int add_push(link_ctx *ctx)
                  * TLS (if any) parameters.
                  */
                 path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART);
-                
                 push = apr_pcalloc(ctx->pool, sizeof(*push));
-                
                 switch (ctx->req->push_policy) {
                     case H2_PUSH_HEAD:
                         method = "HEAD";
@@ -318,15 +345,10 @@ static int add_push(link_ctx *ctx)
                         break;
                 }
                 headers = apr_table_make(ctx->pool, 5);
-                apr_table_do(set_header, headers, ctx->req->headers,
-                             "User-Agent",
-                             "Cache-Control",
-                             "Accept-Language",
-                             NULL);
-                req = h2_request_createn(0, ctx->pool, ctx->req->config, 
-                                         method, ctx->req->scheme,
-                                         ctx->req->authority, 
-                                         path, headers);
+                apr_table_do(set_push_header, headers, ctx->req->headers, NULL);
+                req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme,
+                                         ctx->req->authority, path, headers,
+                                         ctx->req->serialize);
                 /* atm, we do not push on pushes */
                 h2_request_end_headers(req, ctx->pool, 1, 0);
                 push->req = req;
@@ -434,38 +456,30 @@ apr_array_header_t *h2_push_collect(apr_
     return NULL;
 }
 
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled)
-{
-    h2_push_policy policy = H2_PUSH_NONE;
-    if (push_enabled) {
-        const char *val = apr_table_get(req->headers, "accept-push-policy");
-        if (val) {
-            if (ap_find_token(p, val, "fast-load")) {
-                policy = H2_PUSH_FAST_LOAD;
-            }
-            else if (ap_find_token(p, val, "head")) {
-                policy = H2_PUSH_HEAD;
-            }
-            else if (ap_find_token(p, val, "default")) {
-                policy = H2_PUSH_DEFAULT;
-            }
-            else if (ap_find_token(p, val, "none")) {
-                policy = H2_PUSH_NONE;
-            }
-            else {
-                /* nothing known found in this header, go by default */
-                policy = H2_PUSH_DEFAULT;
-            }
-        }
-        else {
-            policy = H2_PUSH_DEFAULT;
-        }
-    }
-    req->push_policy = policy;
-}
-
 /*******************************************************************************
  * push diary 
+ *
+ * - The push diary keeps track of resources already PUSHed via HTTP/2 on this
+ *   connection. It records a hash value from the absolute URL of the resource
+ *   pushed.
+ * - Lacking openssl, it uses 'apr_hashfunc_default' for the value
+ * - with openssl, it uses SHA256 to calculate the hash value
+ * - whatever the method to generate the hash, the diary keeps a maximum of 64
+ *   bits per hash, limiting the memory consumption to about 
+ *      H2PushDiarySize * 8 
+ *   bytes. Entries are sorted by most recently used and oldest entries are
+ *   forgotten first.
+ * - Clients can initialize/replace the push diary by sending a 'Cache-Digest'
+ *   header. Currently, this is the base64url encoded value of the cache digest
+ *   as specified in https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/
+ *   This draft can be expected to evolve and the definition of the header
+ *   will be added there and refined.
+ * - The cache digest header is a Golomb Coded Set of hash values, but it may
+ *   limit the amount of bits per hash value even further. For a good description
+ *   of GCS, read here:
+ *      http://giovanni.bajo.it/post/47119962313/golomb-coded-sets-smaller-than-bloom-filters
+ * - The means that the push diary might be initialized with hash values of much
+ *   less than 64 bits, leading to more false positives, but smaller digest size.
  ******************************************************************************/
  
  
@@ -688,36 +702,6 @@ apr_array_header_t *h2_push_collect_upda
     return h2_push_diary_update(stream->session, pushes);
 }
 
-/* h2_log2(n) iff n is a power of 2 */
-static unsigned char h2_log2(apr_uint32_t n)
-{
-    int lz = 0;
-    if (!n) {
-        return 0;
-    }
-    if (!(n & 0xffff0000u)) {
-        lz += 16;
-        n = (n << 16);
-    }
-    if (!(n & 0xff000000u)) {
-        lz += 8;
-        n = (n << 8);
-    }
-    if (!(n & 0xf0000000u)) {
-        lz += 4;
-        n = (n << 4);
-    }
-    if (!(n & 0xc0000000u)) {
-        lz += 2;
-        n = (n << 2);
-    }
-    if (!(n & 0x80000000u)) {
-        lz += 1;
-    }
-    
-    return 31 - lz;
-}
-
 static apr_int32_t h2_log2inv(unsigned char log2)
 {
     return log2? (1 << log2) : 1;
@@ -794,8 +778,8 @@ static apr_status_t gset_encode_next(gse
     /* Intentional no APLOGNO */
     ap_log_perror(APLOG_MARK, GCSLOG_LEVEL, 0, encoder->pool,
                   "h2_push_diary_enc: val=%"APR_UINT64_T_HEX_FMT", delta=%"
-                  APR_UINT64_T_HEX_FMT" flex_bits=%ld, "
-                  "fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT, 
+                  APR_UINT64_T_HEX_FMT" flex_bits=%"APR_UINT64_T_FMT", "
+                  ", fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT, 
                   pval, delta, flex_bits, encoder->fixed_bits, delta&encoder->fixed_mask);
     for (; flex_bits != 0; --flex_bits) {
         status = gset_encode_bit(encoder, 1);

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_push.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.h Wed Mar  2 11:21:28 2016
@@ -15,19 +15,14 @@
 #ifndef __mod_h2__h2_push__
 #define __mod_h2__h2_push__
 
+#include "h2.h"
+
 struct h2_request;
 struct h2_response;
 struct h2_ngheader;
 struct h2_session;
 struct h2_stream;
 
-typedef enum {
-    H2_PUSH_NONE,
-    H2_PUSH_DEFAULT,
-    H2_PUSH_HEAD,
-    H2_PUSH_FAST_LOAD,
-} h2_push_policy;
-
 typedef struct h2_push {
     const struct h2_request *req;
 } h2_push;
@@ -66,17 +61,6 @@ apr_array_header_t *h2_push_collect(apr_
                                     const struct h2_response *res);
 
 /**
- * Set the push policy for the given request. Takes request headers into 
- * account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00
- * for details.
- * 
- * @param req the request to determine the policy for
- * @param p the pool to use
- * @param push_enabled if HTTP/2 server push is generally enabled for this request
- */
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled);
-
-/**
  * Create a new push diary for the given maximum number of entries.
  * 
  * @oaram p the pool to use

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.c Wed Mar  2 11:21:28 2016
@@ -30,45 +30,36 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
-#include "h2_config.h"
-#include "h2_mplx.h"
 #include "h2_push.h"
 #include "h2_request.h"
-#include "h2_task.h"
 #include "h2_util.h"
 
 
-h2_request *h2_request_create(int id, apr_pool_t *pool,
-                              const struct h2_config *config)
+h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize)
 {
-    return h2_request_createn(id, pool, config, 
-                              NULL, NULL, NULL, NULL, NULL);
+    return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL,
+                              serialize);
 }
 
 h2_request *h2_request_createn(int id, apr_pool_t *pool,
-                               const struct h2_config *config, 
                                const char *method, const char *scheme,
                                const char *authority, const char *path,
-                               apr_table_t *header)
+                               apr_table_t *header, int serialize)
 {
     h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
     
     req->id             = id;
-    req->config         = config;
     req->method         = method;
     req->scheme         = scheme;
     req->authority      = authority;
     req->path           = path;
     req->headers        = header? header : apr_table_make(pool, 10);
     req->request_time   = apr_time_now();
-
+    req->serialize      = serialize;
+    
     return req;
 }
 
-void h2_request_destroy(h2_request *req)
-{
-}
-
 static apr_status_t inspect_clen(h2_request *req, const char *s)
 {
     char *end;
@@ -139,38 +130,48 @@ static apr_status_t add_all_h1_header(h2
 }
 
 
+apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
+                             const char *method, const char *scheme, 
+                             const char *authority, const char *path, 
+                             apr_table_t *headers)
+{
+    req->method    = method;
+    req->scheme    = scheme;
+    req->authority = authority;
+    req->path      = path;
+
+    AP_DEBUG_ASSERT(req->scheme);
+    AP_DEBUG_ASSERT(req->authority);
+    AP_DEBUG_ASSERT(req->path);
+    AP_DEBUG_ASSERT(req->method);
+
+    return add_all_h1_header(req, pool, headers);
+}
+
 apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
 {
     apr_status_t status;
+    const char *scheme, *authority;
     
-    req->config    = h2_config_rget(r);
-    req->method    = r->method;
-    req->scheme    = (r->parsed_uri.scheme? r->parsed_uri.scheme
-                      : ap_http_scheme(r));
-    req->authority = r->hostname;
-    req->path      = apr_uri_unparse(r->pool, &r->parsed_uri, 
-                                     APR_URI_UNP_OMITSITEPART);
-
-    if (!ap_strchr_c(req->authority, ':') && r->server && r->server->port) {
-        apr_port_t defport = apr_uri_port_of_scheme(req->scheme);
+    scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+              : ap_http_scheme(r));
+    authority = r->hostname;
+    if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
+        apr_port_t defport = apr_uri_port_of_scheme(scheme);
         if (defport != r->server->port) {
             /* port info missing and port is not default for scheme: append */
-            req->authority = apr_psprintf(r->pool, "%s:%d", req->authority,
-                                          (int)r->server->port);
+            authority = apr_psprintf(r->pool, "%s:%d", authority,
+                                     (int)r->server->port);
         }
     }
     
-    AP_DEBUG_ASSERT(req->scheme);
-    AP_DEBUG_ASSERT(req->authority);
-    AP_DEBUG_ASSERT(req->path);
-    AP_DEBUG_ASSERT(req->method);
-
-    status = add_all_h1_header(req, r->pool, r->headers_in);
-
+    status = h2_request_make(req, r->pool,  r->method, scheme, authority,
+                             apr_uri_unparse(r->pool, &r->parsed_uri, 
+                                             APR_URI_UNP_OMITSITEPART),
+                             r->headers_in);
     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
                   "h2_request(%d): rwrite %s host=%s://%s%s",
                   req->id, req->method, req->scheme, req->authority, req->path);
-                  
     return status;
 }
 
@@ -337,11 +338,22 @@ void h2_request_copy(apr_pool_t *p, h2_r
     dst->authority      = OPT_COPY(p, src->authority);
     dst->path           = OPT_COPY(p, src->path);
     dst->headers        = apr_table_clone(p, src->headers);
+    if (src->trailers) {
+        dst->trailers   = apr_table_clone(p, src->trailers);
+    }
     dst->content_length = src->content_length;
     dst->chunked        = src->chunked;
     dst->eoh            = src->eoh;
 }
 
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
+{
+    h2_request *nreq = apr_pcalloc(p, sizeof(*nreq));
+    memcpy(nreq, src, sizeof(*nreq));
+    h2_request_copy(p, nreq, src);
+    return nreq;
+}
+
 request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
 {
     request_rec *r;
@@ -361,7 +373,7 @@ request_rec *h2_request_create_rec(const
     
     r->allowed_methods = ap_make_method_list(p, 2);
     
-    r->headers_in      = apr_table_copy(r->pool, req->headers);
+    r->headers_in      = apr_table_clone(r->pool, req->headers);
     r->trailers_in     = apr_table_make(r->pool, 5);
     r->subprocess_env  = apr_table_make(r->pool, 25);
     r->headers_out     = apr_table_make(r->pool, 12);
@@ -408,7 +420,7 @@ request_rec *h2_request_create_rec(const
     }
 
     ap_parse_uri(r, req->path);
-    r->protocol = (char*)"HTTP/2";
+    r->protocol = "HTTP/2";
     r->proto_num = HTTP_VERSION(2, 0);
 
     r->the_request = apr_psprintf(r->pool, "%s %s %s", 

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.h Wed Mar  2 11:21:28 2016
@@ -16,48 +16,19 @@
 #ifndef __mod_h2__h2_request__
 #define __mod_h2__h2_request__
 
-/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
- * format that will be fed to various httpd input filters to finally
- * become a request_rec to be handled by soemone.
- */
-struct h2_config;
-struct h2_to_h1;
-struct h2_mplx;
-struct h2_task;
-
-typedef struct h2_request h2_request;
-
-struct h2_request {
-    int id;             /* stream id */
-
-    const char *method; /* pseudo header values, see ch. 8.1.2.3 */
-    const char *scheme;
-    const char *authority;
-    const char *path;
-    
-    apr_table_t *headers;
-    apr_table_t *trailers;
-
-    apr_time_t request_time;
-    apr_off_t content_length;
-    
-    unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
-    unsigned int eoh     : 1; /* iff end-of-headers has been seen and request is complete */
-    unsigned int body    : 1; /* iff this request has a body */
-    unsigned int push_policy; /* which push policy to use for this request */
-    const struct h2_config *config;
-};
+#include "h2.h"
 
-h2_request *h2_request_create(int id, apr_pool_t *pool, 
-                              const struct h2_config *config);
+h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize);
 
 h2_request *h2_request_createn(int id, apr_pool_t *pool,
-                               const struct h2_config *config, 
                                const char *method, const char *scheme,
                                const char *authority, const char *path,
-                               apr_table_t *headers);
+                               apr_table_t *headers, int serialize);
 
-void h2_request_destroy(h2_request *req);
+apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
+                             const char *method, const char *scheme, 
+                             const char *authority, const char *path, 
+                             apr_table_t *headers);
 
 apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
 
@@ -74,6 +45,8 @@ apr_status_t h2_request_end_headers(h2_r
 
 void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src);
 
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
+
 /**
  * Create a request_rec representing the h2_request to be
  * processed on the given connection.