You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2021/10/12 13:34:01 UTC

svn commit: r1894163 [4/8] - in /httpd/httpd/trunk: ./ changes-entries/ modules/http2/ test/modules/http2/

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1894163&r1=1894162&r2=1894163&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Tue Oct 12 13:34:01 2021
@@ -36,14 +36,14 @@
 #include "h2_private.h"
 #include "h2_bucket_beam.h"
 #include "h2_config.h"
-#include "h2_conn.h"
-#include "h2_ctx.h"
-#include "h2_h2.h"
+#include "h2_c1.h"
+#include "h2_conn_ctx.h"
+#include "h2_protocol.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
 #include "h2_stream.h"
 #include "h2_session.h"
-#include "h2_task.h"
+#include "h2_c2.h"
 #include "h2_workers.h"
 #include "h2_util.h"
 
@@ -56,25 +56,28 @@ typedef struct {
     apr_size_t count;
 } stream_iter_ctx;
 
-/**
- * Naming convention for static functions:
- * - m_*: function only called from the master connection
- * - s_*: function only called from a secondary connection
- * - t_*: function only called from a h2_task holder
- * - mst_*: function called from everyone
- */
-
-static apr_status_t s_mplx_be_happy(h2_mplx *m, h2_task *task);
+static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
 static apr_status_t m_be_annoyed(h2_mplx *m);
 
-apr_status_t h2_mplx_m_child_init(apr_pool_t *pool, server_rec *s)
+static apr_status_t mplx_pollset_create(h2_mplx *m);
+static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
+static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
+static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
+                            stream_ev_callback *on_stream_input,
+                            stream_ev_callback *on_stream_output,
+                            void *on_ctx);
+
+static apr_pool_t *pchild;
+
+apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s)
 {
+    pchild = pool;
     return APR_SUCCESS;
 }
 
 #define H2_MPLX_ENTER(m)    \
-    do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
-        return rv;\
+    do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\
+        return rv_lock;\
     } } while(0)
 
 #define H2_MPLX_LEAVE(m)    \
@@ -89,56 +92,84 @@ apr_status_t h2_mplx_m_child_init(apr_po
 #define H2_MPLX_LEAVE_MAYBE(m, dolock)    \
     if (dolock) apr_thread_mutex_unlock(m->lock)
 
-static void mst_check_data_for(h2_mplx *m, int stream_id, int mplx_is_locked);
+static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
+{
+    h2_stream_in_consumed(ctx, length);
+}
 
-static void mst_stream_input_ev(void *ctx, h2_bucket_beam *beam)
+static int stream_is_running(h2_stream *stream)
 {
-    h2_stream *stream = ctx;
-    h2_mplx *m = stream->session->mplx;
-    apr_atomic_set32(&m->event_pending, 1); 
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+    return conn_ctx && conn_ctx->started_at != 0 && !conn_ctx->done;
 }
 
-static void m_stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
+int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream)
 {
-    h2_stream_in_consumed(ctx, length);
+    int rv;
+
+    H2_MPLX_ENTER(m);
+    rv = stream_is_running(stream);
+    H2_MPLX_LEAVE(m);
+    return rv;
 }
 
-static void ms_stream_joined(h2_mplx *m, h2_stream *stream)
+static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream)
 {
-    ap_assert(!h2_task_has_started(stream->task) || stream->task->worker_done);
+    ap_assert(!stream_is_running(stream));
     
-    h2_ififo_remove(m->readyq, stream->id);
     h2_ihash_remove(m->shold, stream->id);
-    h2_ihash_add(m->spurge, stream);
+    APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
 }
 
 static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
 {
-    ap_assert(stream->state == H2_SS_CLEANUP);
+    h2_conn_ctx_t *c2_ctx = stream->c2? h2_conn_ctx_get(stream->c2) : NULL;
 
-    if (stream->input) {
-        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
-        h2_beam_abort(stream->input);
-    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                  H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
     if (stream->output) {
-        h2_beam_on_produced(stream->output, NULL, NULL);
-        h2_beam_leave(stream->output);
+        h2_beam_on_was_empty(stream->output, NULL, NULL);
+    }
+    if (stream->input) {
+        h2_beam_on_received(stream->input, NULL, NULL);
+        h2_beam_on_consumed(stream->input, NULL, NULL);
     }
-    
-    h2_stream_cleanup(stream);
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                  H2_STRM_MSG(stream, "cleanup, removing from registries"));
+    ap_assert(stream->state == H2_SS_CLEANUP);
+    h2_stream_cleanup(stream);
     h2_ihash_remove(m->streams, stream->id);
     h2_iq_remove(m->q, stream->id);
-    
-    if (!h2_task_has_started(stream->task) || stream->task->done_done) {
-        ms_stream_joined(m, stream);
+
+    if (c2_ctx) {
+        if (!stream_is_running(stream)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                          H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
+            /* processing has finished */
+            APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
+        }
+        else {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                          H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
+            /* c2 is still running */
+            stream->c2->aborted = 1;
+            if (stream->input) {
+                h2_beam_abort(stream->input, m->c1);
+            }
+            if (stream->output) {
+                h2_beam_abort(stream->output, m->c1);
+            }
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                          H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
+            h2_ihash_add(m->shold, stream);
+        }
     }
     else {
-        h2_ififo_remove(m->readyq, stream->id);
-        h2_ihash_add(m->shold, stream);
-        if (stream->task) {
-            stream->task->c->aborted = 1;
-        }
+        /* never started */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                      H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
+        APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
     }
 }
 
@@ -153,179 +184,111 @@ static void m_stream_cleanup(h2_mplx *m,
  *   their HTTP/1 cousins, the separate allocator seems to work better
  *   than protecting a shared h2_session one with an own lock.
  */
-h2_mplx *h2_mplx_m_create(conn_rec *c, server_rec *s, apr_pool_t *parent, 
+h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent,
                           h2_workers *workers)
 {
+    h2_conn_ctx_t *conn_ctx;
     apr_status_t status = APR_SUCCESS;
     apr_allocator_t *allocator;
-    apr_thread_mutex_t *mutex;
-    h2_mplx *m;
+    apr_thread_mutex_t *mutex = NULL;
+    h2_mplx *m = NULL;
     
     m = apr_pcalloc(parent, sizeof(h2_mplx));
-    if (m) {
-        m->id = c->id;
-        m->c = c;
-        m->s = s;
-        
-        /* We create a pool with its own allocator to be used for
-         * processing secondary connections. This is the only way to have the
-         * processing independent of its parent pool in the sense that it
-         * can work in another thread. Also, the new allocator needs its own
-         * mutex to synchronize sub-pools.
-         */
-        status = apr_allocator_create(&allocator);
-        if (status != APR_SUCCESS) {
-            return NULL;
-        }
-        apr_allocator_max_free_set(allocator, ap_max_mem_free);
-        apr_pool_create_ex(&m->pool, parent, NULL, allocator);
-        if (!m->pool) {
-            apr_allocator_destroy(allocator);
-            return NULL;
-        }
-        apr_pool_tag(m->pool, "h2_mplx");
-        apr_allocator_owner_set(allocator, m->pool);
-        status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
-                                         m->pool);
-        if (status != APR_SUCCESS) {
-            apr_pool_destroy(m->pool);
-            return NULL;
-        }
-        apr_allocator_mutex_set(allocator, mutex);
-
-        status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
-                                         m->pool);
-        if (status != APR_SUCCESS) {
-            apr_pool_destroy(m->pool);
-            return NULL;
-        }
-        
-        m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
-        m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
+    m->stream0 = stream0;
+    m->c1 = stream0->c2;
+    m->s = s;
+    m->id = m->c1->id;
+
+    /* We create a pool with its own allocator to be used for
+     * processing secondary connections. This is the only way to have the
+     * processing independent of its parent pool in the sense that it
+     * can work in another thread. Also, the new allocator needs its own
+     * mutex to synchronize sub-pools.
+     */
+    status = apr_allocator_create(&allocator);
+    if (status != APR_SUCCESS) {
+        allocator = NULL;
+        goto failure;
+    }
+
+    apr_allocator_max_free_set(allocator, ap_max_mem_free);
+    apr_pool_create_ex(&m->pool, parent, NULL, allocator);
+    if (!m->pool) goto failure;
+
+    apr_pool_tag(m->pool, "h2_mplx");
+    apr_allocator_owner_set(allocator, m->pool);
+
+    status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT,
+                                     m->pool);
+    if (APR_SUCCESS != status) goto failure;
+    apr_allocator_mutex_set(allocator, mutex);
+
+    status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
+                                     m->pool);
+    if (APR_SUCCESS != status) goto failure;
+
+    status = apr_thread_cond_create(&m->join_wait, m->pool);
+    if (APR_SUCCESS != status) goto failure;
+
+    m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
+    m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
+
+    m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+    m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+    m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+    m->q = h2_iq_create(m->pool, m->max_streams);
+
+    m->workers = workers;
+    m->processing_max = workers->max_workers;
+    m->processing_limit = 6; /* the original h1 max parallel connections */
+    m->last_mood_change = apr_time_now();
+    m->mood_update_interval = apr_time_from_msec(100);
+
+    status = mplx_pollset_create(m);
+    if (APR_SUCCESS != status) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10308)
+                      "nghttp2: could not create pollset");
+        goto failure;
+    }
+    m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+    m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+    m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
+
+#if !H2_POLL_STREAMS
+    status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
+                                     m->pool);
+    if (APR_SUCCESS != status) goto failure;
+    m->streams_input_read = h2_iq_create(m->pool, 10);
+    m->streams_output_written = h2_iq_create(m->pool, 10);
+#endif
+
+    conn_ctx = h2_conn_ctx_get(m->c1);
+    mplx_pollset_add(m, conn_ctx);
 
-        m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
-        m->q = h2_iq_create(m->pool, m->max_streams);
-
-        status = h2_ififo_set_create(&m->readyq, m->pool, m->max_streams);
-        if (status != APR_SUCCESS) {
-            apr_pool_destroy(m->pool);
-            return NULL;
-        }
-
-        m->workers = workers;
-        m->max_active = workers->max_workers;
-        m->limit_active = 6; /* the original h1 max parallel connections */
-        m->last_mood_change = apr_time_now();
-        m->mood_update_interval = apr_time_from_msec(100);
-        
-        m->spare_secondary = apr_array_make(m->pool, 10, sizeof(conn_rec*));
-    }
     return m;
+
+failure:
+    if (m->pool) {
+        apr_pool_destroy(m->pool);
+    }
+    else if (allocator) {
+        apr_allocator_destroy(allocator);
+    }
+    return NULL;
 }
 
-int h2_mplx_m_shutdown(h2_mplx *m)
+int h2_mplx_c1_shutdown(h2_mplx *m)
 {
-    int max_stream_started = 0;
+    int max_stream_id_started = 0;
     
     H2_MPLX_ENTER(m);
 
-    max_stream_started = m->max_stream_started;
+    max_stream_id_started = m->max_stream_id_started;
     /* Clear schedule queue, disabling existing streams from starting */ 
     h2_iq_clear(m->q);
 
     H2_MPLX_LEAVE(m);
-    return max_stream_started;
-}
-
-static int m_input_consumed_signal(h2_mplx *m, h2_stream *stream)
-{
-    if (stream->input) {
-        return h2_beam_report_consumption(stream->input);
-    }
-    return 0;
-}
-
-static int m_report_consumption_iter(void *ctx, void *val)
-{
-    h2_stream *stream = val;
-    h2_mplx *m = ctx;
-    
-    m_input_consumed_signal(m, stream);
-    if (stream->state == H2_SS_CLOSED_L
-        && (!stream->task || stream->task->worker_done)) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, 
-                      H2_STRM_LOG(APLOGNO(10026), stream, "remote close missing")); 
-        nghttp2_submit_rst_stream(stream->session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  stream->id, NGHTTP2_NO_ERROR);
-    }
-    return 1;
-}
-
-static int s_output_consumed_signal(h2_mplx *m, h2_task *task)
-{
-    if (task->output.beam) {
-        return h2_beam_report_consumption(task->output.beam);
-    }
-    return 0;
-}
-
-static int m_stream_destroy_iter(void *ctx, void *val) 
-{   
-    h2_mplx *m = ctx;
-    h2_stream *stream = val;
-
-    h2_ihash_remove(m->spurge, stream->id);
-    ap_assert(stream->state == H2_SS_CLEANUP);
-    
-    if (stream->input) {
-        /* Process outstanding events before destruction */
-        m_input_consumed_signal(m, stream);    
-        h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy");
-        h2_beam_destroy(stream->input);
-        stream->input = NULL;
-    }
-
-    if (stream->task) {
-        h2_task *task = stream->task;
-        conn_rec *secondary;
-        int reuse_secondary = 0;
-        
-        stream->task = NULL;
-        secondary = task->c;
-        if (secondary) {
-            if (m->s->keep_alive_max == 0 || secondary->keepalives < m->s->keep_alive_max) {
-                reuse_secondary = ((m->spare_secondary->nelts < (m->limit_active * 3 / 2))
-                                   && !task->rst_error);
-            }
-            
-            if (reuse_secondary) {
-                h2_beam_log(task->output.beam, m->c, APLOG_DEBUG, 
-                            APLOGNO(03385) "h2_task_destroy, reuse secondary");    
-                h2_task_destroy(task);
-                APR_ARRAY_PUSH(m->spare_secondary, conn_rec*) = secondary;
-            }
-            else {
-                h2_beam_log(task->output.beam, m->c, APLOG_TRACE1, 
-                            "h2_task_destroy, destroy secondary");    
-                h2_secondary_destroy(secondary);
-            }
-        }
-    }
-    h2_stream_destroy(stream);
-    return 0;
-}
-
-static void m_purge_streams(h2_mplx *m, int lock)
-{
-    if (!h2_ihash_empty(m->spurge)) {
-        H2_MPLX_ENTER_MAYBE(m, lock);
-        while (!h2_ihash_iter(m->spurge, m_stream_destroy_iter, m)) {
-            /* repeat until empty */
-        }
-        H2_MPLX_LEAVE_MAYBE(m, lock);
-    }
+    return max_stream_id_started;
 }
 
 typedef struct {
@@ -339,7 +302,7 @@ static int m_stream_iter_wrap(void *ctx,
     return x->cb(stream, x->ctx);
 }
 
-apr_status_t h2_mplx_m_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
+apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
 {
     stream_iter_ctx_t x;
     
@@ -356,24 +319,22 @@ apr_status_t h2_mplx_m_stream_do(h2_mplx
 static int m_report_stream_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
-    h2_task *task = stream->task;
-    if (APLOGctrace1(m->c)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"), 
-                      !!stream->task, stream->scheduled, h2_stream_is_ready(stream),
-                      (long)h2_beam_get_buffered(stream->output));
-    }
-    if (task) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1,
+                  H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"),
+                  !!stream->c2, stream->scheduled, h2_stream_is_ready(stream),
+                  (long)(stream->output? h2_beam_get_buffered(stream->output) : -1));
+    if (conn_ctx) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
                       H2_STRM_MSG(stream, "->03198: %s %s %s"
                       "[started=%d/done=%d]"), 
-                      task->request->method, task->request->authority, 
-                      task->request->path, task->worker_started, 
-                      task->worker_done);
+                      conn_ctx->request->method, conn_ctx->request->authority,
+                      conn_ctx->request->path, conn_ctx->started_at != 0,
+                      conn_ctx->done);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
-                      H2_STRM_MSG(stream, "->03198: no task"));
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */
+                      H2_STRM_MSG(stream, "->03198: not started"));
     }
     return 1;
 }
@@ -381,9 +342,9 @@ static int m_report_stream_iter(void *ct
 static int m_unexpected_stream_iter(void *ctx, void *val) {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
-    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */
                   H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"), 
-                  !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
+                  !!stream->c2, stream->scheduled, h2_stream_is_ready(stream));
     return 1;
 }
 
@@ -391,9 +352,9 @@ static int m_stream_cancel_iter(void *ct
     h2_mplx *m = ctx;
     h2_stream *stream = val;
 
-    /* disabled input consumed reporting */
+    /* disable input consumed reporting */
     if (stream->input) {
-        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+        h2_beam_abort(stream->input, m->c1);
     }
     /* take over event monitoring */
     h2_stream_set_monitor(stream, NULL);
@@ -405,32 +366,32 @@ static int m_stream_cancel_iter(void *ct
     return 0;
 }
 
-void h2_mplx_m_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
+void h2_mplx_c1_destroy(h2_mplx *m)
 {
     apr_status_t status;
     int i, wait_secs = 60, old_aborted;
 
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
                   "h2_mplx(%ld): start release", m->id);
     /* How to shut down a h2 connection:
-     * 0. abort and tell the workers that no more tasks will come from us */
+     * 0. abort and tell the workers that no more work will come from us */
     m->aborted = 1;
     h2_workers_unregister(m->workers, m);
     
     H2_MPLX_ENTER_ALWAYS(m);
 
-    /* While really terminating any secondary connections, treat the master
+    /* While really terminating any c2 connections, treat the master
      * connection as aborted. It's not as if we could send any more data
      * at this point. */
-    old_aborted = m->c->aborted;
-    m->c->aborted = 1;
+    old_aborted = m->c1->aborted;
+    m->c1->aborted = 1;
 
     /* How to shut down a h2 connection:
      * 1. cancel all streams still active */
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, 
-                  "h2_mplx(%ld): release, %d/%d/%d streams (total/hold/purge), %d active tasks", 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                  "h2_mplx(%ld): release, %d/%d/%d streams (total/hold/purge), %d streams",
                   m->id, (int)h2_ihash_count(m->streams),
-                  (int)h2_ihash_count(m->shold), (int)h2_ihash_count(m->spurge), m->tasks_active);
+                  (int)h2_ihash_count(m->shold), m->spurge->nelts, m->processing_count);
     while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) {
         /* until empty */
     }
@@ -440,18 +401,17 @@ void h2_mplx_m_release_and_join(h2_mplx
     ap_assert(h2_iq_empty(m->q));
     
     /* 3. while workers are busy on this connection, meaning they
-     *    are processing tasks from this connection, wait on them finishing
+     *    are processing streams from this connection, wait on them finishing
      *    in order to wake us and let us check again. 
      *    Eventually, this has to succeed. */    
-    m->join_wait = wait;
-    for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {        
-        status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+    for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
+        status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs));
         
         if (APR_STATUS_IS_TIMEUP(status)) {
             /* This can happen if we have very long running requests
              * that do not time out on IO. */
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
-                          "h2_mplx(%ld): waited %d sec for %d tasks", 
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198)
+                          "h2_mplx(%ld): waited %d sec for %d streams",
                           m->id, i*wait_secs, (int)h2_ihash_count(m->shold));
             h2_ihash_iter(m->shold, m_report_stream_iter, m);
         }
@@ -459,83 +419,108 @@ void h2_mplx_m_release_and_join(h2_mplx
     m->join_wait = NULL;
 
     /* 4. With all workers done, all streams should be in spurge */
-    ap_assert(m->tasks_active == 0);
+    ap_assert(m->processing_count == 0);
     if (!h2_ihash_empty(m->shold)) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516)
                       "h2_mplx(%ld): unexpected %d streams in hold", 
                       m->id, (int)h2_ihash_count(m->shold));
         h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
     }
     
-    m->c->aborted = old_aborted;
+    m->c1->aborted = old_aborted;
     H2_MPLX_LEAVE(m);
 
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): released", m->id);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, "h2_mplx(%ld): released", m->id);
 }
 
-apr_status_t h2_mplx_m_stream_cleanup(h2_mplx *m, h2_stream *stream)
+apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream,
+                                       int *pstream_count)
 {
     H2_MPLX_ENTER(m);
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
                   H2_STRM_MSG(stream, "cleanup"));
-    m_stream_cleanup(m, stream);        
-    
+    m_stream_cleanup(m, stream);
+    *pstream_count = (int)h2_ihash_count(m->streams);
     H2_MPLX_LEAVE(m);
     return APR_SUCCESS;
 }
 
-h2_stream *h2_mplx_t_stream_get(h2_mplx *m, h2_task *task)
+const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
 {
     h2_stream *s = NULL;
     
     H2_MPLX_ENTER_ALWAYS(m);
-
-    s = h2_ihash_get(m->streams, task->stream_id);
-
+    s = h2_ihash_get(m->streams, stream_id);
     H2_MPLX_LEAVE(m);
+
     return s;
 }
 
-static void mst_output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+static void c1_purge_streams(h2_mplx *m)
 {
-    h2_stream *stream = ctx;
-    h2_mplx *m = stream->session->mplx;
-    
-    mst_check_data_for(m, stream->id, 0);
+    h2_stream *stream;
+    int i;
+
+    for (i = 0; i < m->spurge->nelts; ++i) {
+        stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
+        ap_assert(stream->state == H2_SS_CLEANUP);
+        if (stream->input) {
+            h2_beam_destroy(stream->input, m->c1);
+            stream->input = NULL;
+        }
+        if (stream->c2) {
+            conn_rec *c2 = stream->c2;
+            h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
+            apr_status_t rv;
+
+            stream->c2 = NULL;
+            ap_assert(c2_ctx);
+            rv = mplx_pollset_remove(m, c2_ctx);
+            if (APR_SUCCESS != rv) {
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1,
+                              "h2_mplx(%ld-%d): pollset_remove %d on purge",
+                              m->id, stream->id, c2_ctx->stream_id);
+            }
+            h2_conn_ctx_destroy(c2);
+            h2_c2_destroy(c2);
+        }
+        h2_stream_destroy(stream);
+    }
+    apr_array_clear(m->spurge);
 }
 
-static apr_status_t t_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
+apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
+                            stream_ev_callback *on_stream_input,
+                            stream_ev_callback *on_stream_output,
+                            void *on_ctx)
 {
-    h2_stream *stream = h2_ihash_get(m->streams, stream_id);
-    
-    if (!stream || !stream->task || m->aborted) {
-        return APR_ECONNABORTED;
-    }
-    
-    ap_assert(stream->output == NULL);
-    stream->output = beam;
-    
-    if (APLOGctrace2(m->c)) {
-        h2_beam_log(beam, stream->task->c, APLOG_TRACE2, "out_open");
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->task->c,
-                      "h2_mplx(%s): out open", stream->task->id);
+    apr_status_t rv;
+
+    H2_MPLX_ENTER(m);
+
+    if (m->aborted) {
+        rv = APR_ECONNABORTED;
+        goto cleanup;
     }
-    
-    h2_beam_on_produced(stream->output, mst_output_produced, stream);
-    if (stream->task->output.copy_files) {
-        h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
+    /* Purge (destroy) streams outside of pollset processing.
+     * Streams that are registered in the pollset, will be removed
+     * when they are destroyed, but the pollset works on copies
+     * of these registrations. So, if we destroy streams while
+     * processing pollset events, we might access freed memory.
+     */
+    if (m->spurge->nelts) {
+        c1_purge_streams(m);
     }
-    
-    /* we might see some file buckets in the output, see
-     * if we have enough handles reserved. */
-    mst_check_data_for(m, stream->id, 1);
-    return APR_SUCCESS;
+    rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx);
+
+cleanup:
+    H2_MPLX_LEAVE(m);
+    return rv;
 }
 
-apr_status_t h2_mplx_t_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
+apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
+                                    h2_session *session)
 {
     apr_status_t status;
     
@@ -545,212 +530,326 @@ apr_status_t h2_mplx_t_out_open(h2_mplx
         status = APR_ECONNABORTED;
     }
     else {
-        status = t_out_open(m, stream_id, beam);
+        h2_iq_sort(m->q, cmp, session);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                      "h2_mplx(%ld): reprioritize streams", m->id);
+        status = APR_SUCCESS;
     }
 
     H2_MPLX_LEAVE(m);
     return status;
 }
 
-static apr_status_t s_out_close(h2_mplx *m, h2_task *task)
+static void ms_register_if_needed(h2_mplx *m, int from_master)
 {
-    apr_status_t status = APR_SUCCESS;
-
-    if (!task) {
-        return APR_ECONNABORTED;
-    }
-    if (task->c) {
-        ++task->c->keepalives;
-    }
-    
-    if (!h2_ihash_get(m->streams, task->stream_id)) {
-        return APR_ECONNABORTED;
+    if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
+        apr_status_t status = h2_workers_register(m->workers, m); 
+        if (status == APR_SUCCESS) {
+            m->is_registered = 1;
+        }
+        else if (from_master) {
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10021)
+                          "h2_mplx(%ld): register at workers", m->id);
+        }
     }
-
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
-                  "h2_mplx(%s): close", task->id);
-    status = h2_beam_close(task->output.beam);
-    h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "out_close");
-    s_output_consumed_signal(m, task);
-    mst_check_data_for(m, task->stream_id, 1);
-    return status;
 }
 
-apr_status_t h2_mplx_m_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
-                                   apr_thread_cond_t *iowait)
+static apr_status_t c1_process_stream(h2_mplx *m,
+                                      h2_stream *stream,
+                                      h2_stream_pri_cmp_fn *cmp,
+                                      h2_session *session)
 {
-    apr_status_t status;
-    
-    H2_MPLX_ENTER(m);
+    apr_status_t rv;
 
     if (m->aborted) {
-        status = APR_ECONNABORTED;
+        rv = APR_ECONNABORTED;
+        goto cleanup;
     }
-    else if (h2_mplx_m_has_master_events(m)) {
-        status = APR_SUCCESS;
+    if (!stream->request) {
+        rv = APR_EINVAL;
+        goto cleanup;
+    }
+    if (APLOGctrace1(m->c1)) {
+        const h2_request *r = stream->request;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                      H2_STRM_MSG(stream, "process %s %s://%s%s chunked=%d"),
+                      r->method, r->scheme, r->authority, r->path, r->chunked);
+    }
+
+    rv = h2_stream_setup_input(stream);
+    if (APR_SUCCESS != rv) goto cleanup;
+
+    stream->scheduled = 1;
+    h2_ihash_add(m->streams, stream);
+    if (h2_stream_is_ready(stream)) {
+        /* already have a response */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                      H2_STRM_MSG(stream, "process, ready already"));
     }
     else {
-        m_purge_streams(m, 0);
-        h2_ihash_iter(m->streams, m_report_consumption_iter, m);
-        m->added_output = iowait;
-        status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
-        if (APLOGctrace2(m->c)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                          "h2_mplx(%ld): trywait on data for %f ms)",
-                          m->id, timeout/1000.0);
-        }
-        m->added_output = NULL;
+        h2_iq_add(m->q, stream->id, cmp, session);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                      H2_STRM_MSG(stream, "process, added to q"));
     }
 
-    H2_MPLX_LEAVE(m);
-    return status;
+cleanup:
+    return rv;
 }
 
-static void mst_check_data_for(h2_mplx *m, int stream_id, int mplx_is_locked)
+apr_status_t h2_mplx_c1_process(h2_mplx *m,
+                                h2_iqueue *ready_to_process,
+                                h2_stream_get_fn *get_stream,
+                                h2_stream_pri_cmp_fn *stream_pri_cmp,
+                                h2_session *session,
+                                int *pstream_count)
 {
-    /* If m->lock is already held, we must release during h2_ififo_push()
-     * which can wait on its not_full condition, causing a deadlock because
-     * no one would then be able to acquire m->lock to empty the fifo.
-     */
-    H2_MPLX_LEAVE_MAYBE(m, mplx_is_locked);
-    if (h2_ififo_push(m->readyq, stream_id) == APR_SUCCESS) {
-        H2_MPLX_ENTER_ALWAYS(m);
-        apr_atomic_set32(&m->event_pending, 1);
-        if (m->added_output) {
-            apr_thread_cond_signal(m->added_output);
+    apr_status_t rv;
+    int sid;
+
+    H2_MPLX_ENTER(m);
+
+    while ((sid = h2_iq_shift(ready_to_process)) > 0) {
+        h2_stream *stream = get_stream(session, sid);
+        if (stream) {
+            ap_assert(!stream->scheduled);
+            rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session);
+            if (APR_SUCCESS != rv) {
+                h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+            }
+        }
+        else {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
+                          "h2_stream(%ld-%d): not found to process", m->id, sid);
         }
-        H2_MPLX_LEAVE_MAYBE(m, !mplx_is_locked);
-    }
-    else {
-        H2_MPLX_ENTER_MAYBE(m, mplx_is_locked);
     }
+    ms_register_if_needed(m, 1);
+    *pstream_count = (int)h2_ihash_count(m->streams);
+#if APR_POOL_DEBUG
+    do {
+        apr_size_t mem_g, mem_m, mem_s, mem_w, mem_c1;
+
+        mem_g = pchild? apr_pool_num_bytes(pchild, 1) : 0;
+        mem_m = apr_pool_num_bytes(m->pool, 1);
+        mem_s = apr_pool_num_bytes(session->pool, 1);
+        mem_w = apr_pool_num_bytes(m->workers->pool, 1);
+        mem_c1 = apr_pool_num_bytes(m->c1->pool, 1);
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c1,
+                      "h2_mplx(%ld): child mem=%ld, mplx mem=%ld, session mem=%ld, workers=%ld, c1=%ld",
+                      m->id, (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_w, (long)mem_c1);
+
+    } while (0);
+#endif
+
+    H2_MPLX_LEAVE(m);
+    return rv;
 }
 
-apr_status_t h2_mplx_m_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
+apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
+                                  h2_stream_get_fn *get_stream,
+                                  struct h2_session *session)
 {
-    apr_status_t status;
-    
+    int sid;
+
     H2_MPLX_ENTER(m);
 
-    if (m->aborted) {
-        status = APR_ECONNABORTED;
-    }
-    else {
-        h2_iq_sort(m->q, cmp, ctx);
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): reprioritize tasks", m->id);
-        status = APR_SUCCESS;
+    while ((sid = h2_iq_shift(input_pending)) > 0) {
+        h2_stream *stream = get_stream(session, sid);
+        if (stream) {
+            H2_MPLX_LEAVE(m);
+            h2_stream_flush_input(stream);
+            H2_MPLX_ENTER(m);
+        }
     }
 
     H2_MPLX_LEAVE(m);
-    return status;
+    return APR_SUCCESS;
 }
 
-static void ms_register_if_needed(h2_mplx *m, int from_master) 
+static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
 {
-    if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) {
-        apr_status_t status = h2_workers_register(m->workers, m); 
-        if (status == APR_SUCCESS) {
-            m->is_registered = 1;
+    conn_rec *c = ctx;
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+
+    (void)beam;
+    if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
+        apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
+    }
+}
+
+static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
+{
+    conn_rec *c = ctx;
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+
+    if (conn_ctx && conn_ctx->stream_id) {
+        if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
+            apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
         }
-        else if (from_master) {
-            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO(10021)
-                          "h2_mplx(%ld): register at workers", m->id);
+#if !H2_POLL_STREAMS
+        else {
+            apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+            h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
+            apr_pollset_wakeup(conn_ctx->mplx->pollset);
+            apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
         }
+#endif
     }
 }
 
-apr_status_t h2_mplx_m_process(h2_mplx *m, struct h2_stream *stream, 
-                               h2_stream_pri_cmp *cmp, void *ctx)
+static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
 {
-    apr_status_t status;
-    
-    H2_MPLX_ENTER(m);
+    conn_rec *c = ctx;
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
 
-    if (m->aborted) {
-        status = APR_ECONNABORTED;
-    }
-    else {
-        status = APR_SUCCESS;
-        h2_ihash_add(m->streams, stream);
-        if (h2_stream_is_ready(stream)) {
-            /* already have a response */
-            mst_check_data_for(m, stream->id, 1);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          H2_STRM_MSG(stream, "process, add to readyq")); 
+    if (conn_ctx && conn_ctx->stream_id) {
+        if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
+            apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
         }
+#if !H2_POLL_STREAMS
         else {
-            h2_iq_add(m->q, stream->id, cmp, ctx);
-            ms_register_if_needed(m, 1);                
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          H2_STRM_MSG(stream, "process, added to q")); 
+            apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+            h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
+            apr_pollset_wakeup(conn_ctx->mplx->pollset);
+            apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
         }
+#endif
     }
+}
 
-    H2_MPLX_LEAVE(m);
-    return status;
+static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
+{
+    h2_conn_ctx_t *conn_ctx;
+    apr_status_t rv = APR_SUCCESS;
+    const char *action = "init";
+
+    rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream);
+    if (APR_SUCCESS != rv) goto cleanup;
+
+    if (!conn_ctx->beam_out) {
+        action = "create output beam";
+        rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
+                            stream->id, "output", 0, c2->base_server->timeout);
+        if (APR_SUCCESS != rv) goto cleanup;
+
+        h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
+        h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2);
+    }
+
+    if (stream->input) {
+        conn_ctx->beam_in = stream->input;
+        h2_beam_on_was_empty(stream->input, c2_beam_input_write_notify, c2);
+        h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
+        h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
+    }
+    else {
+        memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
+    }
+
+#if H2_POLL_STREAMS
+    if (!conn_ctx->mplx_pool) {
+        apr_pool_create(&conn_ctx->mplx_pool, m->pool);
+        apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
+    }
+
+    if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
+        action = "create output pipe";
+        rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
+                                        &conn_ctx->pipe_out_prod[H2_PIPE_IN],
+                                        APR_FULL_NONBLOCK,
+                                        conn_ctx->mplx_pool, c2->pool);
+        if (APR_SUCCESS != rv) goto cleanup;
+    }
+    conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
+    conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
+    conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
+    conn_ctx->pfd_out_prod.client_data = conn_ctx;
+
+    if (stream->input) {
+        if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
+            action = "create input write pipe";
+            rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
+                                            &conn_ctx->pipe_in_prod[H2_PIPE_IN],
+                                            APR_READ_BLOCK,
+                                            c2->pool, conn_ctx->mplx_pool);
+            if (APR_SUCCESS != rv) goto cleanup;
+        }
+        if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
+            action = "create input read pipe";
+            rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
+                                            &conn_ctx->pipe_in_drain[H2_PIPE_IN],
+                                            APR_FULL_NONBLOCK,
+                                            c2->pool, conn_ctx->mplx_pool);
+            if (APR_SUCCESS != rv) goto cleanup;
+        }
+        conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
+        conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
+        conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
+        conn_ctx->pfd_in_drain.client_data = conn_ctx;
+    }
+#else
+    memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
+    memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
+    memset(&conn_ctx->pipe_in_drain, 0, sizeof(conn_ctx->pipe_in_drain));
+#endif
+
+cleanup:
+    stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
+    if (APR_SUCCESS != rv) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2,
+                      H2_STRM_LOG(APLOGNO(10309), stream,
+                      "error %s"), action);
+    }
+    return rv;
 }
 
-static h2_task *s_next_stream_task(h2_mplx *m)
+static conn_rec *s_next_c2(h2_mplx *m)
 {
-    h2_stream *stream;
+    h2_stream *stream = NULL;
+    apr_status_t rv;
     int sid;
-    while (!m->aborted && (m->tasks_active < m->limit_active)
+    conn_rec *c2;
+
+    while (!m->aborted && !stream && (m->processing_count < m->processing_limit)
            && (sid = h2_iq_shift(m->q)) > 0) {
-        
         stream = h2_ihash_get(m->streams, sid);
-        if (stream) {
-            conn_rec *secondary, **psecondary;
+    }
 
-            psecondary = (conn_rec **)apr_array_pop(m->spare_secondary);
-            if (psecondary) {
-                secondary = *psecondary;
-                secondary->aborted = 0;
-            }
-            else {
-                secondary = h2_secondary_create(m->c, stream->id, m->pool);
-            }
-            
-            if (!stream->task) {
-                if (sid > m->max_stream_started) {
-                    m->max_stream_started = sid;
-                }
-                if (stream->input) {
-                    h2_beam_on_consumed(stream->input, mst_stream_input_ev, 
-                                        m_stream_input_consumed, stream);
-                }
-                
-                stream->task = h2_task_create(secondary, stream->id, 
-                                              stream->request, m, stream->input, 
-                                              stream->session->s->timeout,
-                                              m->stream_max_mem);
-                if (!stream->task) {
-                    ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, secondary,
-                                  H2_STRM_LOG(APLOGNO(02941), stream, 
-                                  "create task"));
-                    return NULL;
-                }
-            }
-            
-            stream->task->started_at = apr_time_now();
-            ++m->tasks_active;
-            return stream->task;
+    if (!stream) {
+        if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+                          "h2_session(%ld): delaying request processing. "
+                          "Current limit is %d and %d workers are in use.",
+                          m->id, m->processing_limit, m->processing_count);
         }
+        return NULL;
     }
-    if (m->tasks_active >= m->limit_active && !h2_iq_empty(m->q)) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                      "h2_session(%ld): delaying request processing. "
-                      "Current limit is %d and %d workers are in use.",
-                      m->id, m->limit_active, m->tasks_active);
+
+    if (sid > m->max_stream_id_started) {
+        m->max_stream_id_started = sid;
     }
-    return NULL;
+
+    c2 = h2_c2_create(m->c1, m->pool);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1,
+                  H2_STRM_MSG(stream, "created new c2"));
+
+    rv = c2_setup_io(m, c2, stream);
+    if (APR_SUCCESS != rv) {
+        return NULL;
+    }
+
+    stream->c2 = c2;
+    ++m->processing_count;
+    APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
+    apr_pollset_wakeup(m->pollset);
+
+    return c2;
 }
 
-apr_status_t h2_mplx_s_pop_task(h2_mplx *m, h2_task **ptask)
+apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c)
 {
     apr_status_t rv = APR_EOF;
     
-    *ptask = NULL;
+    *out_c = NULL;
     ap_assert(m);
     ap_assert(m->lock);
     
@@ -762,8 +861,8 @@ apr_status_t h2_mplx_s_pop_task(h2_mplx
         rv = APR_EOF;
     }
     else {
-        *ptask = s_next_stream_task(m);
-        rv = (*ptask != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
+        *out_c = s_next_c2(m);
+        rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
     }
     if (APR_EAGAIN != rv) {
         m->is_registered = 0; /* h2_workers will discard this mplx */
@@ -772,85 +871,86 @@ apr_status_t h2_mplx_s_pop_task(h2_mplx
     return rv;
 }
 
-static void s_task_done(h2_mplx *m, h2_task *task)
+static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
 {
     h2_stream *stream;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
-                  "h2_mplx(%ld): task(%s) done", m->id, task->id);
-    s_out_close(m, task);
-    
-    task->worker_done = 1;
-    task->done_at = apr_time_now();
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
-                  "h2_mplx(%s): request done, %f ms elapsed", task->id, 
-                  (task->done_at - task->started_at) / 1000.0);
-    
-    if (task->c && !task->c->aborted && task->started_at > m->last_mood_change) {
-        s_mplx_be_happy(m, task);
+
+    ap_assert(conn_ctx);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
+                  "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id);
+
+    ap_assert(conn_ctx->done == 0);
+    conn_ctx->done = 1;
+    conn_ctx->done_at = apr_time_now();
+    ++c2->keepalives;
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+                  "h2_mplx(%s-%d): request done, %f ms elapsed",
+                  conn_ctx->id, conn_ctx->stream_id,
+                  (conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
+    
+    if (!conn_ctx->has_final_response) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
+                      "h2_c2(%s-%d): processing finished without final response",
+                      conn_ctx->id, conn_ctx->stream_id);
+        c2->aborted = 1;
+    }
+    else if (!c2->aborted && conn_ctx->started_at > m->last_mood_change) {
+        s_mplx_be_happy(m, c2, conn_ctx);
     }
     
-    ap_assert(task->done_done == 0);
-
-    stream = h2_ihash_get(m->streams, task->stream_id);
+    stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
     if (stream) {
-        /* stream not done yet. */
-        if (!m->aborted && task->redo) {
-            /* reset and schedule again */
-            h2_task_redo(task);
-            h2_iq_add(m->q, stream->id, NULL, NULL);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
-                          H2_STRM_MSG(stream, "redo, added to q")); 
-        }
-        else {
-            /* stream not cleaned up, stay around */
-            task->done_done = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
-                          H2_STRM_MSG(stream, "task_done, stream open")); 
-            if (stream->input) {
-                h2_beam_leave(stream->input);
-            }
-
-            /* more data will not arrive, resume the stream */
-            mst_check_data_for(m, stream->id, 1);
-        }
+        /* stream not done yet. trigger a potential polling on the output
+         * since nothing more will happening here. */
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+                      H2_STRM_MSG(stream, "c2_done, stream open"));
+        c2_beam_output_write_notify(c2, NULL);
     }
-    else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+    else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
         /* stream is done, was just waiting for this. */
-        task->done_done = 1;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
-                      H2_STRM_MSG(stream, "task_done, in hold"));
-        if (stream->input) {
-            h2_beam_leave(stream->input);
-        }
-        ms_stream_joined(m, stream);
-    }
-    else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, task->c,   
-                      H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
-        ap_assert("stream should not be in spurge" == NULL);
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
+                      H2_STRM_MSG(stream, "c2_done, in hold"));
+        c1c2_stream_joined(m, stream);
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, task->c, APLOGNO(03518)
-                      "h2_mplx(%s): task_done, stream not found", 
-                      task->id);
+        int i;
+
+        for (i = 0; i < m->spurge->nelts; ++i) {
+            if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2,
+                              H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
+                ap_assert("stream should not be in spurge" == NULL);
+                return;
+            }
+        }
+
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518)
+                      "h2_mplx(%s-%d): c2_done, stream not found",
+                      conn_ctx->id, conn_ctx->stream_id);
         ap_assert("stream should still be available" == NULL);
     }
 }
 
-void h2_mplx_s_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
+void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2)
 {
+    h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
+    h2_mplx *m;
+
+    if (!conn_ctx || !conn_ctx->mplx) return;
+    m = conn_ctx->mplx;
+
     H2_MPLX_ENTER_ALWAYS(m);
 
-    --m->tasks_active;
-    s_task_done(m, task);
+    --m->processing_count;
+    s_c2_done(m, c2, conn_ctx);
     
     if (m->join_wait) {
         apr_thread_cond_signal(m->join_wait);
     }
-    if (ptask) {
-        /* caller wants another task */
-        *ptask = s_next_stream_task(m);
+    if (out_c2) {
+        /* caller wants another connection to process */
+        *out_c2 = s_next_c2(m);
     }
     ms_register_if_needed(m, 0);
 
@@ -861,117 +961,21 @@ void h2_mplx_s_task_done(h2_mplx *m, h2_
  * h2_mplx DoS protection
  ******************************************************************************/
 
-static int m_timed_out_busy_iter(void *data, void *val)
-{
-    stream_iter_ctx *ctx = data;
-    h2_stream *stream = val;
-    if (h2_task_has_started(stream->task) && !stream->task->worker_done
-        && (ctx->now - stream->task->started_at) > stream->task->timeout) {
-        /* timed out stream occupying a worker, found */
-        ctx->stream = stream;
-        return 0;
-    }
-    return 1;
-}
-
-static h2_stream *m_get_timed_out_busy_stream(h2_mplx *m) 
-{
-    stream_iter_ctx ctx;
-    ctx.m = m;
-    ctx.stream = NULL;
-    ctx.now = apr_time_now();
-    h2_ihash_iter(m->streams, m_timed_out_busy_iter, &ctx);
-    return ctx.stream;
-}
-
-static int m_latest_repeatable_unsubmitted_iter(void *data, void *val)
-{
-    stream_iter_ctx *ctx = data;
-    h2_stream *stream = val;
-    
-    if (!stream->task) goto leave;
-    if (!h2_task_has_started(stream->task) || stream->task->worker_done) goto leave;
-    if (h2_stream_is_ready(stream)) goto leave;
-    if (stream->task->redo) {
-        ++ctx->count;
-        goto leave;
-    }
-    if (h2_task_can_redo(stream->task)) {
-        /* this task occupies a worker, the response has not been submitted 
-         * yet, not been cancelled and it is a repeatable request
-         * -> we could redo it later */
-        if (!ctx->stream 
-            || (ctx->stream->task->started_at < stream->task->started_at)) {
-            /* we did not have one or this one was started later */
-            ctx->stream = stream;
-        }
-    }
-leave:
-    return 1;
-}
-
-static apr_status_t m_assess_task_to_throttle(h2_task **ptask, h2_mplx *m) 
-{
-    stream_iter_ctx ctx;
-    
-    /* count the running tasks already marked for redo and get one that could
-     * be throttled */
-    *ptask = NULL;
-    ctx.m = m;
-    ctx.stream = NULL;
-    ctx.count = 0;
-    h2_ihash_iter(m->streams, m_latest_repeatable_unsubmitted_iter, &ctx);
-    if (m->tasks_active - ctx.count > m->limit_active) {
-        /* we are above the limit of running tasks, accounting for the ones
-         * already throttled. */
-        if (ctx.stream && ctx.stream->task) {
-            *ptask = ctx.stream->task;
-            return APR_EAGAIN;
-        }
-        /* above limit, be seeing no candidate for easy throttling */
-        if (m_get_timed_out_busy_stream(m)) {
-            /* Too many busy workers, unable to cancel enough streams
-             * and with a busy, timed out stream, we tell the client
-             * to go away... */
-            return APR_TIMEUP;
-        }
-    }
-    return APR_SUCCESS;
-}
-
-static apr_status_t m_unschedule_slow_tasks(h2_mplx *m) 
-{
-    h2_task *task;
-    apr_status_t rv;
-    
-    /* Try to get rid of streams that occupy workers. Look for safe requests
-     * that are repeatable. If none found, fail the connection.
-     */
-    while (APR_EAGAIN == (rv = m_assess_task_to_throttle(&task, m))) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                      "h2_mplx(%s): unschedule, resetting task for redo later",
-                      task->id);
-        task->redo = 1;
-        h2_task_rst(task, H2_ERR_CANCEL);
-    }
-    return rv;
-}
-
-static apr_status_t s_mplx_be_happy(h2_mplx *m, h2_task *task)
+static apr_status_t s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx)
 {
     apr_time_t now;            
 
     --m->irritations_since;
     now = apr_time_now();
-    if (m->limit_active < m->max_active 
+    if (m->processing_limit < m->processing_max
         && (now - m->last_mood_change >= m->mood_update_interval
-            || m->irritations_since < -m->limit_active)) {
-        m->limit_active = H2MIN(m->limit_active * 2, m->max_active);
+            || m->irritations_since < -m->processing_limit)) {
+        m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max);
         m->last_mood_change = now;
         m->irritations_since = 0;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_mplx(%ld): mood update, increasing worker limit to %d",
-                      m->id, m->limit_active);
+                      m->id, m->processing_limit);
     }
     return APR_SUCCESS;
 }
@@ -983,159 +987,35 @@ static apr_status_t m_be_annoyed(h2_mplx
 
     ++m->irritations_since;
     now = apr_time_now();
-    if (m->limit_active > 2 && 
+    if (m->processing_limit > 2 &&
         ((now - m->last_mood_change >= m->mood_update_interval)
-         || (m->irritations_since >= m->limit_active))) {
+         || (m->irritations_since >= m->processing_limit))) {
             
-        if (m->limit_active > 16) {
-            m->limit_active = 16;
+        if (m->processing_limit > 16) {
+            m->processing_limit = 16;
         }
-        else if (m->limit_active > 8) {
-            m->limit_active = 8;
+        else if (m->processing_limit > 8) {
+            m->processing_limit = 8;
         }
-        else if (m->limit_active > 4) {
-            m->limit_active = 4;
+        else if (m->processing_limit > 4) {
+            m->processing_limit = 4;
         }
-        else if (m->limit_active > 2) {
-            m->limit_active = 2;
+        else if (m->processing_limit > 2) {
+            m->processing_limit = 2;
         }
         m->last_mood_change = now;
         m->irritations_since = 0;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
                       "h2_mplx(%ld): mood update, decreasing worker limit to %d",
-                      m->id, m->limit_active);
-    }
-    
-    if (m->tasks_active > m->limit_active) {
-        status = m_unschedule_slow_tasks(m);
+                      m->id, m->processing_limit);
     }
     return status;
 }
 
-apr_status_t h2_mplx_m_idle(h2_mplx *m)
-{
-    apr_status_t status = APR_SUCCESS;
-    apr_size_t scount;
-    
-    H2_MPLX_ENTER(m);
-
-    scount = h2_ihash_count(m->streams);
-    if (scount > 0) {
-        if (m->tasks_active) {
-            /* If we have streams in connection state 'IDLE', meaning
-             * all streams are ready to sent data out, but lack
-             * WINDOW_UPDATEs. 
-             * 
-             * This is ok, unless we have streams that still occupy
-             * h2 workers. As worker threads are a scarce resource, 
-             * we need to take measures that we do not get DoSed.
-             * 
-             * This is what we call an 'idle block'. Limit the amount 
-             * of busy workers we allow for this connection until it
-             * well behaves.
-             */
-            status = m_be_annoyed(m);
-        }
-        else if (!h2_iq_empty(m->q)) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): idle, but %d streams to process",
-                          m->id, (int)h2_iq_count(m->q));
-            status = APR_EAGAIN;
-        }
-        else {
-            /* idle, have streams, but no tasks active. what are we waiting for?
-             * WINDOW_UPDATEs from client? */
-            h2_stream *stream = NULL;
-            
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): idle, no tasks ongoing, %d streams",
-                          m->id, (int)h2_ihash_count(m->streams));
-            h2_ihash_shift(m->streams, (void**)&stream, 1);
-            if (stream) {
-                h2_ihash_add(m->streams, stream);
-                if (stream->output && !stream->out_checked) {
-                    /* FIXME: this looks like a race between the session thinking
-                     * it is idle and the EOF on a stream not being sent.
-                     * Signal to caller to leave IDLE state.
-                     */
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
-                                  H2_STRM_MSG(stream, "output closed=%d, mplx idle"
-                                              ", out has %ld bytes buffered"),
-                                  h2_beam_is_closed(stream->output),
-                                  (long)h2_beam_get_buffered(stream->output));
-                    h2_ihash_add(m->streams, stream);
-                    mst_check_data_for(m, stream->id, 1);
-                    stream->out_checked = 1;
-                    status = APR_EAGAIN;
-                }
-            }
-        }
-    }
-    ms_register_if_needed(m, 1);
-
-    H2_MPLX_LEAVE(m);
-    return status;
-}
-
 /*******************************************************************************
  * mplx master events dispatching
  ******************************************************************************/
 
-int h2_mplx_m_has_master_events(h2_mplx *m)
-{
-    return apr_atomic_read32(&m->event_pending) > 0;
-}
-
-apr_status_t h2_mplx_m_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, 
-                                              void *on_ctx)
-{
-    h2_stream *stream;
-    int n, id;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                  "h2_mplx(%ld): dispatch events", m->id);        
-    apr_atomic_set32(&m->event_pending, 0);
-
-    /* update input windows for streams */
-    h2_ihash_iter(m->streams, m_report_consumption_iter, m);    
-    m_purge_streams(m, 1);
-    
-    n = h2_ififo_count(m->readyq);
-    while (n > 0 
-           && (h2_ififo_try_pull(m->readyq, &id) == APR_SUCCESS)) {
-        --n;
-        stream = h2_ihash_get(m->streams, id);
-        if (stream) {
-            on_resume(on_ctx, stream);
-        }
-    }
-    
-    return APR_SUCCESS;
-}
-
-apr_status_t h2_mplx_m_keep_active(h2_mplx *m, h2_stream *stream)
-{
-    mst_check_data_for(m, stream->id, 0);
-    return APR_SUCCESS;
-}
-
-int h2_mplx_m_awaits_data(h2_mplx *m)
-{
-    int waiting = 1;
-     
-    H2_MPLX_ENTER_ALWAYS(m);
-
-    if (h2_ihash_empty(m->streams)) {
-        waiting = 0;
-    }
-    else if (!m->tasks_active && !h2_ififo_count(m->readyq) && h2_iq_empty(m->q)) {
-        waiting = 0;
-    }
-
-    H2_MPLX_LEAVE(m);
-    return waiting;
-}
-
 static int reset_is_acceptable(h2_stream *stream)
 {
     /* client may terminate a stream via H2 RST_STREAM message at any time.
@@ -1151,14 +1031,14 @@ static int reset_is_acceptable(h2_stream
      * The responses to such requests continue forever otherwise.
      *
      */
-    if (!stream->task) return 1; /* have not started or already ended for us. acceptable. */
+    if (!stream_is_running(stream)) return 1;
     if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */
-    if (!stream->has_response) return 0; /* no response headers produced yet. bad. */
+    if (!stream->response) return 0; /* no response headers produced yet. bad. */
     if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */
     return 1; /* otherwise, be forgiving */
 }
 
-apr_status_t h2_mplx_m_client_rst(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
 {
     h2_stream *stream;
     apr_status_t status = APR_SUCCESS;
@@ -1171,3 +1051,240 @@ apr_status_t h2_mplx_m_client_rst(h2_mpl
     H2_MPLX_LEAVE(m);
     return status;
 }
+
+static apr_status_t mplx_pollset_create(h2_mplx *m)
+{
+    int max_pdfs;
+
+    /* stream0 output, pdf_out+pfd_in_consume per active streams */
+    max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams);
+    return apr_pollset_create(&m->pollset, max_pdfs, m->pool,
+                              APR_POLLSET_WAKEABLE);
+}
+
+static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
+{
+    apr_status_t rv = APR_SUCCESS;
+    const char *name = "";
+
+    if (conn_ctx->pfd_out_prod.reqevents) {
+        name = "adding out";
+        rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
+        if (APR_SUCCESS != rv) goto cleanup;
+    }
+
+    if (conn_ctx->pfd_in_drain.reqevents) {
+        name = "adding in_read";
+        rv = apr_pollset_add(m->pollset, &conn_ctx->pfd_in_drain);
+    }
+
+cleanup:
+    if (APR_SUCCESS != rv) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
+                      "h2_mplx(%ld-%d): error while adding to pollset %s",
+                      m->id, conn_ctx->stream_id, name);
+    }
+    return rv;
+}
+
+static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
+{
+    apr_status_t rv = APR_SUCCESS;
+    const char *name = "";
+
+    if (conn_ctx->pfd_out_prod.reqevents) {
+        rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
+        conn_ctx->pfd_out_prod.reqevents = 0;
+        if (APR_SUCCESS != rv) goto cleanup;
+    }
+
+    if (conn_ctx->pfd_in_drain.reqevents) {
+        name = "in_read";
+        rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_in_drain);
+        conn_ctx->pfd_in_drain.reqevents = 0;
+        if (APR_SUCCESS != rv) goto cleanup;
+    }
+
+cleanup:
+    if (APR_SUCCESS != rv) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, m->c1,
+                      "h2_mplx(%ld-%d): error removing from pollset %s",
+                      m->id, conn_ctx->stream_id, name);
+    }
+    return rv;
+}
+
+static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
+                            stream_ev_callback *on_stream_input,
+                            stream_ev_callback *on_stream_output,
+                            void *on_ctx)
+{
+    apr_status_t rv;
+    const apr_pollfd_t *results, *pfd;
+    apr_int32_t nresults, i;
+    h2_conn_ctx_t *conn_ctx;
+    h2_stream *stream;
+
+    /* Make sure we are not called recursively. */
+    ap_assert(!m->polling);
+    m->polling = 1;
+    do {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                      "h2_mplx(%ld): enter polling timeout=%d",
+                      m->id, (int)apr_time_sec(timeout));
+
+        apr_array_clear(m->streams_ev_in);
+        apr_array_clear(m->streams_ev_out);
+
+        do {
+            /* add streams we started processing in the meantime */
+            if (m->streams_to_poll->nelts) {
+                for (i = 0; i < m->streams_to_poll->nelts; ++i) {
+                    stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
+                    if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
+                        mplx_pollset_add(m, conn_ctx);
+                    }
+                }
+                apr_array_clear(m->streams_to_poll);
+            }
+
+#if !H2_POLL_STREAMS
+            apr_thread_mutex_lock(m->poll_lock);
+            if (!h2_iq_empty(m->streams_input_read)
+                || !h2_iq_empty(m->streams_output_written)) {
+                while ((i = h2_iq_shift(m->streams_input_read))) {
+                    stream = h2_ihash_get(m->streams, i);
+                    if (stream) {
+                        APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
+                    }
+                }
+                while ((i = h2_iq_shift(m->streams_output_written))) {
+                    stream = h2_ihash_get(m->streams, i);
+                    if (stream) {
+                        APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
+                    }
+                }
+                nresults = 0;
+                rv = APR_SUCCESS;
+                apr_thread_mutex_unlock(m->poll_lock);
+                break;
+            }
+            apr_thread_mutex_unlock(m->poll_lock);
+#endif
+            H2_MPLX_LEAVE(m);
+            rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
+            H2_MPLX_ENTER_ALWAYS(m);
+
+        } while (APR_STATUS_IS_EINTR(rv));
+
+        if (APR_SUCCESS != rv) {
+            if (APR_STATUS_IS_TIMEUP(rv)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                              "h2_mplx(%ld): polling timed out ",
+                              m->id);
+            }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310)
+                              "h2_mplx(%ld): polling failed", m->id);
+            }
+            goto cleanup;
+        }
+
+        for (i = 0; i < nresults; i++) {
+            pfd = &results[i];
+            conn_ctx = pfd->client_data;
+
+            ap_assert(conn_ctx);
+            if (conn_ctx->stream_id == 0) {
+                if (on_stream_input) {
+                    APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
+                }
+                continue;
+            }
+
+            h2_util_drain_pipe(pfd->desc.f);
+            stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
+            if (!stream) {
+                stream = h2_ihash_get(m->shold, conn_ctx->stream_id);
+                if (stream) {
+                    /* This is normal and means that stream processing on c1 has
+                     * already finished to CLEANUP and c2 is not done yet */
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1,
+                                  "h2_mplx(%ld-%d): stream already in hold for poll event %hx",
+                                   m->id, conn_ctx->stream_id, pfd->rtnevents);
+                }
+                else {
+                    h2_stream *sp = NULL;
+                    int j;
+
+                    for (j = 0; j < m->spurge->nelts; ++j) {
+                        sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*);
+                        if (sp->id == conn_ctx->stream_id) {
+                            stream = sp;
+                            break;
+                        }
+                    }
+
+                    if (stream) {
+                        /* This is normal and means that stream processing on c1 has
+                         * already finished to CLEANUP and c2 is not done yet */
+                        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311)
+                                      "h2_mplx(%ld-%d): stream already in purge for poll event %hx",
+                                       m->id, conn_ctx->stream_id, pfd->rtnevents);
+                    }
+                    else {
+                        /* This should not happen. When a stream has been purged,
+                         * it MUST no longer appear in the pollset. Puring is done
+                         * outside the poll result processing. */
+                        ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312)
+                                      "h2_mplx(%ld-%d): stream no longer known for poll event %hx"
+                                      ", m->streams=%d, conn_ctx=%lx, fd=%lx",
+                                       m->id, conn_ctx->stream_id, pfd->rtnevents,
+                                       (int)h2_ihash_count(m->streams),
+                                       (long)conn_ctx, (long)pfd->desc.f);
+                        h2_ihash_iter(m->streams, m_report_stream_iter, m);
+                    }
+                }
+                continue;
+            }
+
+            if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) {
+                /* output is available */
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                              "[%s-%d] poll output event %hx",
+                              conn_ctx->id, conn_ctx->stream_id,
+                              pfd->rtnevents);
+                APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
+            }
+            else if (conn_ctx->pfd_in_drain.desc.f == pfd->desc.f) {
+                /* input has been consumed */
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
+                              "[%s-%d] poll input event %hx",
+                              conn_ctx->id, conn_ctx->stream_id,
+                              pfd->rtnevents);
+                APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
+            }
+        }
+
+        if (on_stream_input && m->streams_ev_in->nelts) {
+            H2_MPLX_LEAVE(m);
+            for (i = 0; i < m->streams_ev_in->nelts; ++i) {
+                on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
+            }
+            H2_MPLX_ENTER_ALWAYS(m);
+        }
+        if (on_stream_output && m->streams_ev_out->nelts) {
+            H2_MPLX_LEAVE(m);
+            for (i = 0; i < m->streams_ev_out->nelts; ++i) {
+                on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
+            }
+            H2_MPLX_ENTER_ALWAYS(m);
+        }
+        break;
+    } while(1);
+
+cleanup:
+    m->polling = 0;
+    return rv;
+}
+

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1894163&r1=1894162&r2=1894163&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Tue Oct 12 13:34:01 2021
@@ -18,23 +18,16 @@
 #define __mod_h2__h2_mplx__
 
 /**
- * The stream multiplexer. It pushes buckets from the connection
- * thread to the stream threads and vice versa. It's thread-safe
- * to use.
+ * The stream multiplexer. It performs communication between the
+ * primary HTTP/2 connection (c1) to the secondary connections (c2)
+ * that process the requests, aka. HTTP/2 streams.
  *
- * There is one h2_mplx instance for each h2_session, which sits on top
- * of a particular httpd conn_rec. Input goes from the connection to
- * the stream tasks. Output goes from the stream tasks to the connection,
- * e.g. the client.
+ * There is one h2_mplx instance for each h2_session.
  *
- * For each stream, there can be at most "H2StreamMaxMemSize" output bytes
- * queued in the multiplexer. If a task thread tries to write more
- * data, it is blocked until space becomes available.
- *
- * Naming Convention: 
- * "h2_mplx_m_" are methods only to be called by the main connection
- * "h2_mplx_s_" are method only to be called by a secondary connection
- * "h2_mplx_t_" are method only to be called by a task handler (can be master or secondary)
+ * Naming Convention:
+ * "h2_mplx_c1_" are methods only to be called by the primary connection
+ * "h2_mplx_c2_" are methods only to be called by a secondary connection
+ * "h2_mplx_worker_" are methods only to be called by a h2 worker thread
  */
 
 struct apr_pool_t;
@@ -43,7 +36,6 @@ struct apr_thread_cond_t;
 struct h2_bucket_beam;
 struct h2_config;
 struct h2_ihash_t;
-struct h2_task;
 struct h2_stream;
 struct h2_request;
 struct apr_thread_cond_t;
@@ -56,74 +48,71 @@ typedef struct h2_mplx h2_mplx;
 
 struct h2_mplx {
     long id;
-    conn_rec *c;
+    conn_rec *c1;                   /* the main connection */
     apr_pool_t *pool;
+    struct h2_stream *stream0;      /* HTTP/2's stream 0 */
     server_rec *s;                  /* server for master conn */
 
-    unsigned int event_pending;
-    unsigned int aborted;
-    unsigned int is_registered;     /* is registered at h2_workers */
-
-    struct h2_ihash_t *streams;     /* all streams currently processing */
-    struct h2_ihash_t *shold;       /* all streams done with task ongoing */
-    struct h2_ihash_t *spurge;      /* all streams done, ready for destroy */
+    int aborted;
+    int polling;                    /* is waiting/processing pollset events */
+    int is_registered;              /* is registered at h2_workers */
+
+    struct h2_ihash_t *streams;     /* all streams active */
+    struct h2_ihash_t *shold;       /* all streams done with c2 processing ongoing */
+    apr_array_header_t *spurge;     /* all streams done, ready for destroy */
     
     struct h2_iqueue *q;            /* all stream ids that need to be started */
-    struct h2_ififo *readyq;        /* all stream ids ready for output */
-        
-    struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
-    
-    int max_streams;        /* max # of concurrent streams */
-    int max_stream_started; /* highest stream id that started processing */
-    int tasks_active;       /* # of tasks being processed from this mplx */
-    int limit_active;       /* current limit on active tasks, dynamic */
-    int max_active;         /* max, hard limit # of active tasks in a process */
+
+    apr_size_t stream_max_mem;      /* max memory to buffer for a stream */
+    int max_streams;                /* max # of concurrent streams */
+    int max_stream_id_started;      /* highest stream id that started processing */
+
+    int processing_count;           /* # of c2 working for this mplx */
+    int processing_limit;           /* current limit on processing c2s, dynamic */
+    int processing_max;             /* max, hard limit of processing c2s */
     
-    apr_time_t last_mood_change; /* last time, we worker limit changed */
+    apr_time_t last_mood_change;    /* last time, processing limit changed */
     apr_interval_time_t mood_update_interval; /* how frequent we update at most */
     int irritations_since; /* irritations (>0) or happy events (<0) since last mood change */
 
     apr_thread_mutex_t *lock;
-    struct apr_thread_cond_t *added_output;
     struct apr_thread_cond_t *join_wait;
     
-    apr_size_t stream_max_mem;
-    
-    apr_pool_t *spare_io_pool;
-    apr_array_header_t *spare_secondary; /* spare secondary connections */
-    
-    struct h2_workers *workers;
+    apr_pollset_t *pollset;         /* pollset for c1/c2 IO events */
+    apr_array_header_t *streams_to_poll; /* streams to add to the pollset */
+    apr_array_header_t *streams_ev_in;
+    apr_array_header_t *streams_ev_out;
+
+#if !H2_POLL_STREAMS
+    apr_thread_mutex_t *poll_lock; /* not the painter */
+    struct h2_iqueue *streams_input_read;  /* streams whose input has been read from */
+    struct h2_iqueue *streams_output_written; /* streams whose output has been written to */
+#endif
+    struct h2_workers *workers;     /* h2 workers process wide instance */
 };
 
-/*******************************************************************************
- * From the main connection processing: h2_mplx_m_*
- ******************************************************************************/
-
-apr_status_t h2_mplx_m_child_init(apr_pool_t *pool, server_rec *s);
+apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s);
 
 /**
  * Create the multiplexer for the given HTTP2 session. 
  * Implicitly has reference count 1.
  */
-h2_mplx *h2_mplx_m_create(conn_rec *c, server_rec *s, apr_pool_t *master, 
-                          struct h2_workers *workers);
+h2_mplx *h2_mplx_c1_create(struct h2_stream *stream0, server_rec *s, apr_pool_t *master,
+                           struct h2_workers *workers);
 
 /**
- * Decreases the reference counter of this mplx and waits for it
- * to reached 0, destroy the mplx afterwards.
- * This is to be called from the thread that created the mplx in
- * the first place.
- * @param m the mplx to be released and destroyed
+ * Destroy the mplx, shutting down all ongoing processing.
+ * @param m the mplx destroyed
  * @param wait condition var to wait on for ref counter == 0
  */ 
-void h2_mplx_m_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
+void h2_mplx_c1_destroy(h2_mplx *m);
 
 /**
  * Shut down the multiplexer gracefully. Will no longer schedule new streams
  * but let the ongoing ones finish normally.
  * @return the highest stream id being/been processed
  */
-int h2_mplx_m_shutdown(h2_mplx *m);
+int h2_mplx_c1_shutdown(h2_mplx *m);
 
 /**
  * Notifies mplx that a stream has been completely handled on the main
@@ -131,29 +120,33 @@ int h2_mplx_m_shutdown(h2_mplx *m);
  * 
  * @param m the mplx itself
  * @param stream the stream ready for cleanup
+ * @param pstream_count return the number of streams active
  */
-apr_status_t h2_mplx_m_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
+apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, struct h2_stream *stream,
+                                       int *pstream_count);
 
-/**
- * Waits on output data from any stream in this session to become available. 
- * Returns APR_TIMEUP if no data arrived in the given time.
- */
-apr_status_t h2_mplx_m_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
-                                   struct apr_thread_cond_t *iowait);
-
-apr_status_t h2_mplx_m_keep_active(h2_mplx *m, struct h2_stream *stream);
+int h2_mplx_c1_stream_is_running(h2_mplx *m, struct h2_stream *stream);
 
 /**
  * Process a stream request.
  * 
  * @param m the multiplexer
- * @param stream the identifier of the stream
- * @param r the request to be processed
+ * @param read_to_process
+ * @param input_pending
  * @param cmp the stream priority compare function
- * @param ctx context data for the compare function
+ * @param pstream_count on return the number of streams active in mplx
  */
-apr_status_t h2_mplx_m_process(h2_mplx *m, struct h2_stream *stream, 
-                               h2_stream_pri_cmp *cmp, void *ctx);
+apr_status_t h2_mplx_c1_process(h2_mplx *m,
+                                struct h2_iqueue *read_to_process,
+                                h2_stream_get_fn *get_stream,
+                                h2_stream_pri_cmp_fn *cmp,
+                                struct h2_session *session,
+                                int *pstream_count);
+
+apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
+                                  h2_stream_get_fn *get_stream,
+                                  struct h2_session *session);
+
 
 /**
  * Stream priorities have changed, reschedule pending requests.
@@ -162,62 +155,60 @@ apr_status_t h2_mplx_m_process(h2_mplx *
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_m_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
+apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp,
+                                    struct h2_session *session);
 
 typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);
 
 /**
- * Check if the multiplexer has events for the master connection pending.
- * @return != 0 iff there are events pending
+ * Poll the primary connection for input and the active streams for output.
+ * Invoke the callback for any stream where an event happened.
  */
-int h2_mplx_m_has_master_events(h2_mplx *m);
+apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
+                            stream_ev_callback *on_stream_input,
+                            stream_ev_callback *on_stream_output,
+                            void *on_ctx);
 
-/**
- * Dispatch events for the master connection, such as
- ± @param m the multiplexer
- * @param on_resume new output data has arrived for a suspended stream 
- * @param ctx user supplied argument to invocation.
- */
-apr_status_t h2_mplx_m_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, 
-                                              void *ctx);
-
-int h2_mplx_m_awaits_data(h2_mplx *m);
-
-typedef int h2_mplx_stream_cb(struct h2_stream *s, void *ctx);
-
-apr_status_t h2_mplx_m_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx);
+void h2_mplx_c2_input_read(h2_mplx *m, conn_rec *c2);
+void h2_mplx_c2_output_written(h2_mplx *m, conn_rec *c2);
 
-apr_status_t h2_mplx_m_client_rst(h2_mplx *m, int stream_id);
+typedef int h2_mplx_stream_cb(struct h2_stream *s, void *userdata);
 
 /**
- * Master connection has entered idle mode.
- * @param m the mplx instance of the master connection
- * @return != SUCCESS iff connection should be terminated
+ * Iterate over all streams known to mplx from the primary connection.
+ * @param m the mplx
+ * @param cb the callback to invoke on each stream
+ * @param ctx userdata passed to the callback
  */
-apr_status_t h2_mplx_m_idle(h2_mplx *m);
+apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx);
 
-/*******************************************************************************
- * From a secondary connection processing: h2_mplx_s_*
- ******************************************************************************/
-apr_status_t h2_mplx_s_pop_task(h2_mplx *m, struct h2_task **ptask);
-void h2_mplx_s_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask);
-
-/*******************************************************************************
- * From a h2_task owner: h2_mplx_s_*
- * (a task is transfered from master to secondary connection and back in
- * its normal lifetime).
- ******************************************************************************/
+/**
+ * A stream has been RST_STREAM by the client. Abort
+ * any processing going on and remove from processing
+ * queue.
+ */
+apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id);
 
 /**
- * Opens the output for the given stream with the specified response.
+ * Get readonly access to a stream for a secondary connection.
  */
-apr_status_t h2_mplx_t_out_open(h2_mplx *mplx, int stream_id,
-                                struct h2_bucket_beam *beam);
+const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id);
 
 /**
- * Get the stream that belongs to the given task.
+ * A h2 worker asks for a secondary connection to process.
+ * @param out_c2 non-NULL, a pointer where to reveive the next
+ *               secondary connection to process.
  */
-struct h2_stream *h2_mplx_t_stream_get(h2_mplx *m, struct h2_task *task);
+apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c2);
 
+/**
+ * A h2 worker reports a secondary connection processing done.
+ * If it is will to do more work for this mplx (this c1 connection),
+ * it provides `out_c`. Otherwise it passes NULL.
+ * @param c2 the secondary connection finished processing
+ * @param out_c2 NULL or a pointer where to reveive the next
+ *               secondary connection to process.
+ */
+void h2_mplx_worker_c2_done(conn_rec *c2, conn_rec **out_c2);
 
 #endif /* defined(__mod_h2__h2_mplx__) */