You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/04/28 14:43:02 UTC

svn commit: r1741419 [2/4] - in /httpd/httpd/branches/2.4.x: ./ modules/http2/

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Thu Apr 28 12:43:02 2016
@@ -29,38 +29,46 @@
 #include "mod_http2.h"
 
 #include "h2_private.h"
+#include "h2_bucket_beam.h"
 #include "h2_config.h"
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_int_queue.h"
-#include "h2_io.h"
-#include "h2_io_set.h"
 #include "h2_response.h"
 #include "h2_mplx.h"
 #include "h2_ngn_shed.h"
 #include "h2_request.h"
 #include "h2_stream.h"
 #include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
 
 
-#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
-        h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
-    } while(0)
-    
-#define H2_MPLX_IO_IN(lvl,m,io,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
-        h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
-    } while(0)
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, 
+                        conn_rec *c, int level)
+{
+    if (beam && APLOG_C_IS_LEVEL(c,level)) {
+        char buffer[2048];
+        apr_size_t off = 0;
+        
+        off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+        off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+        off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+
+        ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
+                      c->id, id, msg, buffer);
+    }
+}
 
+/* utility for iterating over ihash task sets */
+typedef struct {
+    h2_mplx *m;
+    h2_task *task;
+    apr_time_t now;
+} task_iter_ctx;
 
 /* NULL or the mutex hold by this thread, used for recursive calls
  */
@@ -104,13 +112,57 @@ static void leave_mutex(h2_mplx *m, int
     }
 }
 
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
 {
-    AP_DEBUG_ASSERT(m);
-    if (m->aborted) {
-        *pstatus = APR_ECONNABORTED;
+    leave_mutex(ctx, 1);
+}
+
+static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
+{
+    h2_mplx *m = ctx;
+    int acquired;
+    apr_status_t status;
+    
+    status = enter_mutex(m, &acquired);
+    if (status == APR_SUCCESS) {
+        pbl->mutex = m->lock;
+        pbl->leave = acquired? beam_leave : NULL;
+        pbl->leave_ctx = m;
+    }
+    return status;
+}
+
+static void stream_output_consumed(void *ctx, 
+                                   h2_bucket_beam *beam, apr_off_t length)
+{
+    h2_task *task = ctx;
+    if (length > 0 && task && task->assigned) {
+        h2_req_engine_out_consumed(task->assigned, task->c, length); 
+    }
+}
+
+static void stream_input_consumed(void *ctx, 
+                                  h2_bucket_beam *beam, apr_off_t length)
+{
+    h2_mplx *m = ctx;
+    if (m->input_consumed && length) {
+        m->input_consumed(m->input_consumed_ctx, beam->id, length);
+    }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
+{
+    h2_mplx *m = ctx;
+    if (m->tx_handles_reserved > 0) {
+        --m->tx_handles_reserved;
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+                      "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", 
+                      m->id, beam->id, beam->tag, m->tx_handles_reserved);
         return 1;
     }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+                  "h2_mplx(%ld-%d): can_beam_file denied on %s", 
+                  m->id, beam->id, beam->tag);
     return 0;
 }
 
@@ -118,9 +170,9 @@ static void have_out_data_for(h2_mplx *m
 
 static void check_tx_reservation(h2_mplx *m) 
 {
-    if (m->tx_handles_reserved == 0) {
+    if (m->tx_handles_reserved <= 0) {
         m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
-            H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+            H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
     }
 }
 
@@ -131,8 +183,7 @@ static void check_tx_free(h2_mplx *m)
         m->tx_handles_reserved = m->tx_chunk_size;
         h2_workers_tx_free(m->workers, count);
     }
-    else if (m->tx_handles_reserved 
-             && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+    else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
         h2_workers_tx_free(m->workers, m->tx_handles_reserved);
         m->tx_handles_reserved = 0;
     }
@@ -142,8 +193,8 @@ static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld): destroy, ios=%d", 
-                  m->id, (int)h2_io_set_size(m->stream_ios));
+                  "h2_mplx(%ld): destroy, tasks=%d", 
+                  m->id, (int)h2_ihash_count(m->tasks));
     check_tx_free(m);
     if (m->pool) {
         apr_pool_destroy(m->pool);
@@ -204,9 +255,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+
+        m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->stream_ios = h2_io_set_create(m->pool);
-        m->ready_ios = h2_io_set_create(m->pool);
+        m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+        m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+
         m->stream_timeout = stream_timeout;
         m->workers = workers;
         m->workers_max = workers->max_workers;
@@ -240,75 +294,65 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
     return max_stream_started;
 }
 
-static void workers_register(h2_mplx *m)
-{
-    /* h2_workers is only a hub for all the h2_worker instances.
-     * At the end-of-life of this h2_mplx, we always unregister at
-     * the workers. The thing to manage are all the h2_worker instances
-     * out there. Those may hold a reference to this h2_mplx and we cannot
-     * call them to unregister.
-     * 
-     * Therefore: ref counting for h2_workers in not needed, ref counting
-     * for h2_worker using this is critical.
-     */
-    m->need_registration = 0;
-    h2_workers_register(m->workers, m);
-}
-
-static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void input_consumed_signal(h2_mplx *m, h2_task *task)
 {
-    if (io->input_consumed && m->input_consumed) {
-        m->input_consumed(m->input_consumed_ctx, 
-                          io->id, io->input_consumed);
-        io->input_consumed = 0;
-        return 1;
+    if (task->input.beam && task->worker_started) {
+        h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
     }
-    return 0;
 }
 
-static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
-    if (io->output_consumed && io->task && io->task->assigned) {
-        h2_req_engine_out_consumed(io->task->assigned, io->task->c, 
-                                   io->output_consumed);
-        io->output_consumed = 0;
-        return 1;
+    if (task->output.beam && task->worker_started && task->assigned) {
+        h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
     }
     return 0;
 }
 
-static void io_destroy(h2_mplx *m, h2_io *io, int events)
+
+static void task_destroy(h2_mplx *m, h2_task *task, int events)
 {
-    int reuse_slave;
+    conn_rec *slave = NULL;
+    int reuse_slave = 0;
+    apr_status_t status;
     
     /* cleanup any buffered input */
-    h2_io_in_shutdown(io);
+    status = h2_task_shutdown(task, 0);
+    if (status != APR_SUCCESS){
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO() 
+                      "h2_task(%s): shutdown", task->id);
+    }
+    
     if (events) {
         /* Process outstanding events before destruction */
-        io_in_consumed_signal(m, io);
+        input_consumed_signal(m, task);
     }
     
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    m->tx_handles_reserved += io->files_handles_owned;
-
-    h2_io_set_remove(m->stream_ios, io);
-    h2_io_set_remove(m->ready_ios, io);
-    if (m->redo_ios) {
-        h2_io_set_remove(m->redo_ios, io);
+    if (task->input.beam) {
+        m->tx_handles_reserved += 
+        h2_beam_get_files_beamed(task->input.beam);
     }
-
+    if (task->output.beam) {
+        m->tx_handles_reserved += 
+        h2_beam_get_files_beamed(task->output.beam);
+    }
+    
+    slave = task->c;
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
-                    && !io->rst_error && io->eor);
-    if (io->task) {
-        conn_rec *slave = io->task->c;
-        h2_task_destroy(io->task);
-        io->task = NULL;
-        
+                   && !task->rst_error);
+    
+    h2_ihash_remove(m->tasks, task->stream_id);
+    h2_ihash_remove(m->ready_tasks, task->stream_id);
+    if (m->redo_tasks) {
+        h2_ihash_remove(m->redo_tasks, task->stream_id);
+    }
+    h2_task_destroy(task);
+
+    if (slave) {
         if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
-            apr_bucket_delete(io->eor);
-            io->eor = NULL;
             APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
         }
         else {
@@ -316,59 +360,67 @@ static void io_destroy(h2_mplx *m, h2_io
             h2_slave_destroy(slave, NULL);
         }
     }
-
-    if (io->pool) {
-        apr_pool_destroy(io->pool);
-    }
-
+    
     check_tx_free(m);
 }
 
-static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) 
+static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error) 
 {
     /* Remove io from ready set, we will never submit it */
-    h2_io_set_remove(m->ready_ios, io);
-    if (!io->worker_started || io->worker_done) {
+    h2_ihash_remove(m->ready_tasks, task->stream_id);
+    if (task->worker_done) {
         /* already finished or not even started yet */
-        h2_iq_remove(m->q, io->id);
-        io_destroy(m, io, 1);
+        h2_iq_remove(m->q, task->stream_id);
+        task_destroy(m, task, 0);
         return 0;
     }
     else {
         /* cleanup once task is done */
-        h2_io_make_orphaned(io, rst_error);
+        task->orphaned = 1;
+        if (task->input.beam) {
+            apr_status_t status;
+            status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
+            if (status == APR_EAGAIN) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                              "h2_stream(%ld-%d): wait on input shutdown", 
+                              m->id, task->stream_id);
+                status = h2_beam_shutdown(task->input.beam, APR_BLOCK_READ);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, 
+                              "h2_stream(%ld-%d): input shutdown returned", 
+                              m->id, task->stream_id);
+            }
+            task->input.beam = NULL;
+        }
+        if (rst_error) {
+            h2_task_rst(task, rst_error);
+        }
         return 1;
     }
 }
 
-static int stream_done_iter(void *ctx, h2_io *io)
+static int stream_done_iter(void *ctx, void *val)
 {
-    return io_stream_done((h2_mplx*)ctx, io, 0);
+    return task_stream_done((h2_mplx*)ctx, val, 0);
 }
 
-static int stream_print(void *ctx, h2_io *io)
+static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
-    if (io && io->request) {
+    h2_task *task = val;
+    if (task->request) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
-                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
-                      m->id, io->id, 
-                      io->request->method, io->request->authority, io->request->path,
-                      io->response? "http" : (io->rst_error? "reset" : "?"),
-                      io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done,
-                      io->eos_in, io->eos_out);
+                      "->03198: h2_stream(%s): %s %s %s -> %s %d"
+                      "[orph=%d/started=%d/done=%d]", 
+                      task->id, task->request->method, 
+                      task->request->authority, task->request->path,
+                      task->response? "http" : (task->rst_error? "reset" : "?"),
+                      task->response? task->response->http_status : task->rst_error,
+                      task->orphaned, task->worker_started, 
+                      task->worker_done);
     }
-    else if (io) {
+    else if (task) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%ld-%d): NULL -> %s %d"
-                      "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", 
-                      m->id, io->id, 
-                      io->response? "http" : (io->rst_error? "reset" : "?"),
-                      io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done,
-                      io->eos_in, io->eos_out);
+                      "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -392,7 +444,7 @@ apr_status_t h2_mplx_release_and_join(h2
         
         h2_iq_clear(m->q);
         apr_thread_cond_broadcast(m->task_thawed);
-        while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+        while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
     
@@ -406,10 +458,14 @@ apr_status_t h2_mplx_release_and_join(h2
         for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
-                          m->id, (int)h2_io_set_size(m->stream_ios));
+                          "h2_mplx(%ld): release_join, waiting on %d tasks to report back", 
+                          m->id, (int)h2_ihash_count(m->tasks));
                           
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+            
+            while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
+                /* iterate until all ios have been orphaned or destroyed */
+            }
             if (APR_STATUS_IS_TIMEUP(status)) {
                 if (i > 0) {
                     /* Oh, oh. Still we wait for assigned  workers to report that 
@@ -419,11 +475,11 @@ apr_status_t h2_mplx_release_and_join(h2
                      */
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "%d h2_workers to return, have still %d requests outstanding", 
+                                  "%d h2_workers to return, have still %d tasks outstanding", 
                                   m->id, i*wait_secs, m->workers_busy,
-                                  (int)h2_io_set_size(m->stream_ios));
+                                  (int)h2_ihash_count(m->tasks));
                     if (i == 1) {
-                        h2_io_set_iter(m->stream_ios, stream_print, m);
+                        h2_ihash_iter(m->tasks, task_print, m);
                     }
                 }
                 h2_mplx_abort(m);
@@ -431,13 +487,9 @@ apr_status_t h2_mplx_release_and_join(h2
             }
         }
         
-        if (!h2_io_set_is_empty(m->stream_ios)) {
-            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, 
-                          "h2_mplx(%ld): release_join, %d streams still open", 
-                          m->id, (int)h2_io_set_size(m->stream_ios));
-        }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                      "h2_mplx(%ld): release_join -> destroy", m->id);
+                      "h2_mplx(%ld): release_join (%d tasks left) -> destroy", 
+                      m->id, (int)h2_ihash_count(m->tasks));
         leave_mutex(m, acquired);
         h2_mplx_destroy(m);
         /* all gone */
@@ -468,100 +520,17 @@ apr_status_t h2_mplx_stream_done(h2_mplx
      */
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        h2_task *task = h2_ihash_get(m->tasks, stream_id);
 
+        h2_ihash_remove(m->streams, stream_id);
         /* there should be an h2_io, once the stream has been scheduled
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
-        if (io) {
+        if (task) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_mplx(%ld-%d): marking stream as done.", 
+                          "h2_mplx(%ld-%d): marking stream task as done.", 
                           m->id, stream_id);
-            io_stream_done(m, io, rst_error);
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
-                             int stream_id, apr_bucket_brigade *bb, 
-                             apr_table_t *trailers,
-                             struct apr_thread_cond_t *iowait)
-{
-    apr_status_t status; 
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-            
-            h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
-            status = h2_io_in_read(io, bb, -1, trailers);
-            while (APR_STATUS_IS_EAGAIN(status) 
-                   && !is_aborted(m, &status)
-                   && block == APR_BLOCK_READ) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                              "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)", 
-                              m->id, stream_id);
-                status = h2_io_signal_wait(m, io);
-                if (status == APR_SUCCESS) {
-                    status = h2_io_in_read(io, bb, -1, trailers);
-                }
-            }
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
-            h2_io_signal_exit(io);
-        }
-        else {
-            status = APR_EOF;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              const char *data, apr_size_t len, int eos)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
-            status = h2_io_in_write(io, data, len, eos);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
-            h2_io_signal(io, H2_IO_READ);
-            io_in_consumed_signal(m, io);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            status = h2_io_in_close(io);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
-            h2_io_signal(io, H2_IO_READ);
-            io_in_consumed_signal(m, io);
-        }
-        else {
-            status = APR_ECONNABORTED;
+            task_stream_done(m, task, rst_error);
         }
         leave_mutex(m, acquired);
     }
@@ -574,17 +543,10 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
     m->input_consumed_ctx = ctx;
 }
 
-typedef struct {
-    h2_mplx * m;
-    int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
+static int update_window(void *ctx, void *val)
 {
-    update_ctx *uctx = (update_ctx*)ctx;
-    if (io_in_consumed_signal(uctx->m, io)) {
-        ++uctx->streams_updated;
-    }
+    h2_mplx *m = ctx;
+    input_consumed_signal(m, val);
     return 1;
 }
 
@@ -598,49 +560,22 @@ apr_status_t h2_mplx_in_update_windows(h
         return APR_ECONNABORTED;
     }
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        update_ctx ctx;
-        
-        ctx.m               = m;
-        ctx.streams_updated = 0;
-
-        status = APR_EAGAIN;
-        h2_io_set_iter(m->stream_ios, update_window, &ctx);
+        h2_ihash_iter(m->tasks, update_window, m);
         
-        if (ctx.streams_updated) {
-            status = APR_SUCCESS;
-        }
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+                      "h2_session(%ld): windows updated", m->id);
+        status = APR_SUCCESS;
         leave_mutex(m, acquired);
     }
     return status;
 }
 
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id, 
-                                     apr_bucket_brigade *bb, 
-                                     apr_off_t len, apr_table_t **ptrailers)
+static int task_iter_first(void *ctx, void *val)
 {
-    apr_status_t status;
-    int acquired;
-
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre");
-            
-            status = h2_io_out_get_brigade(io, bb, len);
-            
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post");
-            if (status == APR_SUCCESS) {
-                h2_io_signal(io, H2_IO_WRITE);
-            }
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        *ptrailers = io->response? io->response->trailers : NULL;
-        leave_mutex(m, acquired);
-    }
-    return status;
+    task_iter_ctx *tctx = ctx;
+    h2_task *task = val;
+    tctx->task = task;
+    return 0;
 }
 
 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
@@ -651,138 +586,89 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
 
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_shift(m->ready_ios);
-        if (io && !m->aborted) {
-            stream = h2_ihash_get(streams, io->id);
-            if (stream) {
-                io->submitted = 1;
-                if (io->rst_error) {
-                    h2_stream_rst(stream, io->rst_error);
+        task_iter_ctx ctx;
+        ctx.m = m;
+        ctx.task = NULL;
+        h2_ihash_iter(m->ready_tasks, task_iter_first, &ctx);
+        
+        if (ctx.task && !m->aborted) {
+            h2_task *task = ctx.task;
+            
+            h2_ihash_remove(m->ready_tasks, task->stream_id);
+            stream = h2_ihash_get(streams, task->stream_id);
+            if (stream && task) {
+                task->submitted = 1;
+                if (task->rst_error) {
+                    h2_stream_rst(stream, task->rst_error);
                 }
                 else {
-                    AP_DEBUG_ASSERT(io->response);
-                    H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
-                    h2_stream_set_response(stream, io->response, io->bbout);
-                    H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
+                    AP_DEBUG_ASSERT(task->response);
+                    h2_stream_set_response(stream, task->response, 
+                                           task->output.beam);
                 }
             }
-            else {
+            else if (task) {
                 /* We have the io ready, but the stream has gone away, maybe
                  * reset by the client. Should no longer happen since such
                  * streams should clear io's from the ready queue.
                  */
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
-                              "h2_mplx(%ld): stream for response %d closed, "
+                              "h2_mplx(%s): stream for response closed, "
                               "resetting io to close request processing",
-                              m->id, io->id);
-                h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
-                if (!io->worker_started || io->worker_done) {
-                    io_destroy(m, io, 1);
+                              task->id);
+                task->orphaned = 1;
+                h2_task_rst(task, H2_ERR_STREAM_CLOSED);
+                if (!task->worker_started || task->worker_done) {
+                    task_destroy(m, task, 1);
                 }
                 else {
                     /* hang around until the h2_task is done, but
-                     * shutdown input and send out any events (e.g. window
-                     * updates) asap. */
-                    h2_io_in_shutdown(io);
-                    io_in_consumed_signal(m, io);
+                     * shutdown input/output and send out any events asap. */
+                    h2_task_shutdown(task, 0);
+                    input_consumed_signal(m, task);
                 }
             }
-            
-            h2_io_signal(io, H2_IO_WRITE);
         }
         leave_mutex(m, acquired);
     }
     return stream;
 }
 
-static apr_status_t out_write(h2_mplx *m, h2_io *io, 
-                              ap_filter_t* f, int blocking,
-                              apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
-    /* We check the memory footprint queued for this stream_id
-     * and block if it exceeds our configured limit.
-     * We will not split buckets to enforce the limit to the last
-     * byte. After all, the bucket is already in memory.
-     */
-    while (status == APR_SUCCESS 
-           && !APR_BRIGADE_EMPTY(bb) 
-           && !is_aborted(m, &status)) {
-        
-        status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX, 
-                                 &m->tx_handles_reserved);
-        io_out_consumed_signal(m, io);
-        
-        /* Wait for data to drain until there is room again or
-         * stream timeout expires */
-        h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
-        while (status == APR_SUCCESS
-               && !APR_BRIGADE_EMPTY(bb) 
-               && iowait
-               && (m->stream_max_mem <= h2_io_out_length(io))
-               && !is_aborted(m, &status)) {
-            if (!blocking) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                              "h2_mplx(%ld-%d): incomplete write", 
-                              m->id, io->id);
-                return APR_INCOMPLETE;
-            }
-            if (f) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
-                              "h2_mplx(%ld-%d): waiting for out drain", 
-                              m->id, io->id);
-            }
-            status = h2_io_signal_wait(m, io);
-        }
-        h2_io_signal_exit(io);
+    h2_task *task = h2_ihash_get(m->tasks, stream_id);
+    
+    if (!task || task->orphaned) {
+        return APR_ECONNABORTED;
     }
-    apr_brigade_cleanup(bb);
     
-    return status;
-}
-
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
-                             ap_filter_t* f, apr_bucket_brigade *bb,
-                             struct apr_thread_cond_t *iowait)
-{
-    apr_status_t status = APR_SUCCESS;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%s): open response: %d, rst=%d",
+                  task->id, response->http_status, response->rst_error);
     
-    h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-    if (io && !io->orphaned) {
-        if (f) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
-                          "h2_mplx(%ld-%d): open response: %d, rst=%d",
-                          m->id, stream_id, response->http_status, 
-                          response->rst_error);
-        }
-        
-        h2_io_set_response(io, response);
-        h2_io_set_add(m->ready_ios, io);
-        if (response && response->http_status < 300) {
-            /* we might see some file buckets in the output, see
-             * if we have enough handles reserved. */
-            check_tx_reservation(m);
-        }
-        if (bb) {
-            status = out_write(m, io, f, 0, bb, iowait);
-            if (status == APR_INCOMPLETE) {
-                /* write will have transferred as much data as possible.
-                   caller has to deal with non-empty brigade */
-                status = APR_SUCCESS;
-            }
-        }
-        have_out_data_for(m, stream_id);
-    }
-    else {
-        status = APR_ECONNABORTED;
+    h2_task_set_response(task, response);
+    
+    if (task->output.beam) {
+        h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
+        h2_beam_timeout_set(task->output.beam, m->stream_timeout);
+        h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+        m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
+        h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
+        h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
+    }
+    
+    h2_ihash_add(m->ready_tasks, task);
+    if (response && response->http_status < 300) {
+        /* we might see some file buckets in the output, see
+         * if we have enough handles reserved. */
+        check_tx_reservation(m);
     }
+    have_out_data_for(m, stream_id);
     return status;
 }
 
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
-                              ap_filter_t* f, apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status;
     int acquired;
@@ -793,127 +679,44 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
             status = APR_ECONNABORTED;
         }
         else {
-            status = out_open(m, stream_id, response, f, bb, iowait);
-            if (APLOGctrace1(m->c)) {
-                h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
-            }
+            status = out_open(m, stream_id, response);
         }
         leave_mutex(m, acquired);
     }
     return status;
 }
 
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
-                               ap_filter_t* f, int blocking,
-                               apr_bucket_brigade *bb,
-                               struct apr_thread_cond_t *iowait)
+static apr_status_t out_close(h2_mplx *m, h2_task *task)
 {
-    apr_status_t status;
-    int acquired;
+    apr_status_t status = APR_SUCCESS;
     
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            status = out_write(m, io, f, blocking, bb, iowait);
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): write", m->id, io->id);
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-            
-            have_out_data_for(m, stream_id);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
+    if (!task || task->orphaned) {
+        return APR_ECONNABORTED;
     }
-    return status;
-}
-
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int acquired;
     
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            if (!io->response && !io->rst_error) {
-                /* In case a close comes before a response was created,
-                 * insert an error one so that our streams can properly
-                 * reset.
-                 */
-                h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
-                                                 io->request, m->pool);
-                status = out_open(m, stream_id, r, NULL, NULL, NULL);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
-                              "h2_mplx(%ld-%d): close, no response, no rst", 
-                              m->id, io->id);
-            }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): close with eor=%s", 
-                          m->id, io->id, io->eor? "yes" : "no");
-            status = h2_io_out_close(io);
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
-            io_out_consumed_signal(m, io);
-            
-            have_out_data_for(m, stream_id);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
-    }
-    return status;
-}
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
-{
-    apr_status_t status;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->rst_error && !io->orphaned) {
-            h2_io_rst(io, error);
-            if (!io->response) {
-                h2_io_set_add(m->ready_ios, io);
-            }
-            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-            
-            have_out_data_for(m, stream_id);
-            h2_io_signal(io, H2_IO_WRITE);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
+    if (!task->response && !task->rst_error) {
+        /* In case a close comes before a response was created,
+         * insert an error one so that our streams can properly
+         * reset.
+         */
+        h2_response *r = h2_response_die(task->stream_id, APR_EGENERAL, 
+                                         task->request, m->pool);
+        status = out_open(m, task->stream_id, r);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+                      "h2_mplx(%s): close, no response, no rst", task->id);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                  "h2_mplx(%s): close", task->id);
+    if (task->output.beam) {
+        status = h2_beam_close(task->output.beam);
+        h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
+                    APLOG_TRACE2);
     }
+    output_consumed_signal(m, task);
+    have_out_data_for(m, task->stream_id);
     return status;
 }
 
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int has_data = 0;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_data = h2_io_out_has_data(io);
-        }
-        else {
-            has_data = 0;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_data;
-}
-
 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  apr_thread_cond_t *iowait)
 {
@@ -969,22 +772,7 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
     return status;
 }
 
-static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
-{
-    apr_pool_t *io_pool;
-    h2_io *io;
-    
-    apr_pool_create(&io_pool, m->pool);
-    apr_pool_tag(io_pool, "h2_io");
-    io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
-    h2_io_set_add(m->stream_ios, io);
-    
-    return io;
-}
-
-
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, 
-                             const h2_request *req, 
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
@@ -997,24 +785,27 @@ apr_status_t h2_mplx_process(h2_mplx *m,
             status = APR_ECONNABORTED;
         }
         else {
-            h2_io *io = open_io(m, stream_id, req);
+            h2_beam_create(&stream->input, stream->pool, stream->id, 
+                           "input", 0);
+            h2_ihash_add(m->streams, stream);
             
-            if (!io->request->body) {
-                status = h2_io_in_close(io);
+            if (!m->need_registration) {
+                m->need_registration = h2_iq_empty(m->q);
             }
-            
-            m->need_registration = m->need_registration || h2_iq_empty(m->q);
-            do_registration = (m->need_registration && m->workers_busy < m->workers_max);
-            h2_iq_add(m->q, io->id, cmp, ctx);
-            
+            if (m->workers_busy < m->workers_max) {
+                do_registration = m->need_registration;
+            }
+            h2_iq_add(m->q, stream->id, cmp, ctx);
+
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): process", m->c->id, stream_id);
-            H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
+                          "h2_mplx(%ld-%d): process, body=%d", 
+                          m->c->id, stream->id, stream->request->body);
         }
         leave_mutex(m, acquired);
     }
-    if (status == APR_SUCCESS && do_registration) {
-        workers_register(m);
+    if (do_registration) {
+        m->need_registration = 0;
+        h2_workers_register(m->workers, m);
     }
     return status;
 }
@@ -1022,21 +813,16 @@ apr_status_t h2_mplx_process(h2_mplx *m,
 static h2_task *pop_task(h2_mplx *m)
 {
     h2_task *task = NULL;
+    h2_stream *stream;
     int sid;
-    while (!m->aborted && !task 
-        && (m->workers_busy < m->workers_limit)
-        && (sid = h2_iq_shift(m->q)) > 0) {
-        h2_io *io = h2_io_set_get(m->stream_ios, sid);
-        if (io && io->orphaned) {
-            io_destroy(m, io, 0);
-            if (m->join_wait) {
-                apr_thread_cond_signal(m->join_wait);
-            }
-        }
-        else if (io) {
+    while (!m->aborted && !task  && (m->workers_busy < m->workers_limit)
+           && (sid = h2_iq_shift(m->q)) > 0) {
+        
+        stream = h2_ihash_get(m->streams, sid);
+        if (stream) {
             conn_rec *slave, **pslave;
             int new_conn = 0;
-            
+
             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
             if (pslave) {
                 slave = *pslave;
@@ -1047,17 +833,27 @@ static h2_task *pop_task(h2_mplx *m)
             }
             
             slave->sbh = m->c->sbh;
-            io->task = task = h2_task_create(m->id, io->request, slave, m);
+            task = h2_task_create(slave, stream->request, stream->input, m);
+            h2_ihash_add(m->tasks, task);
+            
             m->c->keepalives++;
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
             if (new_conn) {
                 h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
             }
-            io->worker_started = 1;
-            io->started_at = apr_time_now();
+            task->worker_started = 1;
+            task->started_at = apr_time_now();
             if (sid > m->max_stream_started) {
                 m->max_stream_started = sid;
             }
+
+            if (stream->input) {
+                h2_beam_timeout_set(stream->input, m->stream_timeout);
+                h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+                h2_beam_on_file_beam(stream->input, can_beam_file, m);
+                h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+            }
+
             ++m->workers_busy;
         }
     }
@@ -1091,8 +887,6 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
     if (task) {
-        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
-        
         if (task->frozen) {
             /* this task was handed over to an engine for processing 
              * and the original worker has finished. That means the 
@@ -1103,26 +897,25 @@ static void task_done(h2_mplx *m, h2_tas
             /* FIXME: this implementation is incomplete. */
             h2_task_set_io_blocking(task, 0);
             apr_thread_cond_broadcast(m->task_thawed);
+            return;
         }
         else {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): task(%s) done", m->id, task->id);
-            /* clean our references and report request as done. Signal
-             * that we want another unless we have been aborted */
-            /* TODO: this will keep a worker attached to this h2_mplx as
-             * long as it has requests to handle. Might no be fair to
-             * other mplx's. Perhaps leave after n requests? */
-            h2_mplx_out_close(m, task->stream_id);
+            out_close(m, task);
             
-            if (ngn && io) {
-                apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+            if (ngn) {
+                apr_off_t bytes = 0;
+                if (task->output.beam) {
+                    h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+                    bytes += h2_beam_get_buffered(task->output.beam);
+                }
                 if (bytes > 0) {
                     /* we need to report consumed and current buffered output
                      * to the engine. The request will be streamed out or cancelled,
                      * no more data is coming from it and the engine should update
                      * its calculations before we destroy this information. */
                     h2_req_engine_out_consumed(ngn, task->c, bytes);
-                    io->output_consumed = 0;
                 }
             }
             
@@ -1136,54 +929,50 @@ static void task_done(h2_mplx *m, h2_tas
                 h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
             }
             
-            if (io) {
-                apr_time_t now = apr_time_now();
-                if (!io->orphaned && m->redo_ios
-                    && h2_io_set_get(m->redo_ios, io->id)) {
-                    /* reset and schedule again */
-                    h2_io_redo(io);
-                    h2_io_set_remove(m->redo_ios, io);
-                    h2_iq_add(m->q, io->id, NULL, NULL);
-                }
-                else {
-                    io->worker_done = 1;
-                    io->done_at = now;
+            if (!task->orphaned && m->redo_tasks
+                && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+                /* reset and schedule again */
+                h2_task_redo(task);
+                h2_ihash_remove(m->redo_tasks, task->stream_id);
+                h2_iq_add(m->q, task->stream_id, NULL, NULL);
+                return;
+            }
+            
+            task->worker_done = 1;
+            task->done_at = apr_time_now();
+            if (task->output.beam) {
+                h2_beam_on_consumed(task->output.beam, NULL, NULL);
+                h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+            }
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%s): request done, %f ms"
+                          " elapsed", task->id, 
+                          (task->done_at - task->started_at) / 1000.0);
+            if (task->started_at > m->last_idle_block) {
+                /* this task finished without causing an 'idle block', e.g.
+                 * a block by flow control.
+                 */
+                if (task->done_at- m->last_limit_change >= m->limit_change_interval
+                    && m->workers_limit < m->workers_max) {
+                    /* Well behaving stream, allow it more workers */
+                    m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                             m->workers_max);
+                    m->last_limit_change = task->done_at;
+                    m->need_registration = 1;
                     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                  "h2_mplx(%ld): request(%d) done, %f ms"
-                                  " elapsed", m->id, io->id, 
-                                  (io->done_at - io->started_at) / 1000.0);
-                    if (io->started_at > m->last_idle_block) {
-                        /* this task finished without causing an 'idle block', e.g.
-                         * a block by flow control.
-                         */
-                        if (now - m->last_limit_change >= m->limit_change_interval
-                            && m->workers_limit < m->workers_max) {
-                            /* Well behaving stream, allow it more workers */
-                            m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                                     m->workers_max);
-                            m->last_limit_change = now;
-                            m->need_registration = 1;
-                            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                          "h2_mplx(%ld): increase worker limit to %d",
-                                          m->id, m->workers_limit);
-                        }
-                    }
-                }
-                
-                if (io->orphaned) {
-                    io_destroy(m, io, 0);
-                    if (m->join_wait) {
-                        apr_thread_cond_signal(m->join_wait);
-                    }
+                                  "h2_mplx(%ld): increase worker limit to %d",
+                                  m->id, m->workers_limit);
                 }
-                else {
-                    /* hang around until the stream deregisters */
+            }
+            
+            if (task->orphaned) {
+                task_destroy(m, task, 0);
+                if (m->join_wait) {
+                    apr_thread_cond_signal(m->join_wait);
                 }
             }
             else {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              "h2_mplx(%ld): task %s without corresp. h2_io",
-                              m->id, task->id);
+                /* hang around until the stream deregisters */
             }
         }
     }
@@ -1208,80 +997,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
  * h2_mplx DoS protection
  ******************************************************************************/
 
-typedef struct {
-    h2_mplx *m;
-    h2_io *io;
-    apr_time_t now;
-} io_iter_ctx;
-
-static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
 {
-    io_iter_ctx *ctx = data;
-    if (io->worker_started && !io->worker_done
-        && h2_io_is_repeatable(io)
-        && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
-        /* this io occupies a worker, the response has not been submitted yet,
+    task_iter_ctx *ctx = data;
+    h2_task *task = val;
+    if (!task->worker_done && h2_task_can_redo(task) 
+        && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
+        /* this task occupies a worker, the response has not been submitted yet,
          * not been cancelled and it is a repeatable request
          * -> it can be re-scheduled later */
-        if (!ctx->io || ctx->io->started_at < io->started_at) {
+        if (!ctx->task || ctx->task->started_at < task->started_at) {
             /* we did not have one or this one was started later */
-            ctx->io = io;
+            ctx->task = task;
         }
     }
     return 1;
 }
 
-static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) 
+static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) 
 {
-    io_iter_ctx ctx;
+    task_iter_ctx ctx;
     ctx.m = m;
-    ctx.io = NULL;
-    h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
-    return ctx.io;
+    ctx.task = NULL;
+    h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
+    return ctx.task;
 }
 
-static int timed_out_busy_iter(void *data, h2_io *io)
+static int timed_out_busy_iter(void *data, void *val)
 {
-    io_iter_ctx *ctx = data;
-    if (io->worker_started && !io->worker_done
-        && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+    task_iter_ctx *ctx = data;
+    h2_task *task = val;
+    if (!task->worker_done
+        && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
         /* timed out stream occupying a worker, found */
-        ctx->io = io;
+        ctx->task = task;
         return 0;
     }
     return 1;
 }
-static h2_io *get_timed_out_busy_stream(h2_mplx *m) 
+
+static h2_task *get_timed_out_busy_task(h2_mplx *m) 
 {
-    io_iter_ctx ctx;
+    task_iter_ctx ctx;
     ctx.m = m;
-    ctx.io = NULL;
+    ctx.task = NULL;
     ctx.now = apr_time_now();
-    h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
-    return ctx.io;
+    h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
+    return ctx.task;
 }
 
-static apr_status_t unschedule_slow_ios(h2_mplx *m) 
+static apr_status_t unschedule_slow_tasks(h2_mplx *m) 
 {
-    h2_io *io;
+    h2_task *task;
     int n;
     
-    if (!m->redo_ios) {
-        m->redo_ios = h2_io_set_create(m->pool);
+    if (!m->redo_tasks) {
+        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
     }
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
-    while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
-        h2_io_set_add(m->redo_ios, io);
-        h2_io_rst(io, H2_ERR_CANCEL);
+    n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+    while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
+        h2_task_rst(task, H2_ERR_CANCEL);
+        h2_ihash_add(m->redo_tasks, task);
         --n;
     }
     
-    if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
-        io = get_timed_out_busy_stream(m);
-        if (io) {
+    if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
+        task = get_timed_out_busy_task(m);
+        if (task) {
             /* Too many busy workers, unable to cancel enough streams
              * and with a busy, timed out stream, we tell the client
              * to go away... */
@@ -1298,7 +1083,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        apr_size_t scount = h2_io_set_size(m->stream_ios);
+        apr_size_t scount = h2_ihash_count(m->streams);
         if (scount > 0 && m->workers_busy) {
             /* If we have streams in connection state 'IDLE', meaning
              * all streams are ready to sent data out, but lack
@@ -1335,7 +1120,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
             }
             
             if (m->workers_busy > m->workers_limit) {
-                status = unschedule_slow_ios(m);
+                status = unschedule_slow_tasks(m);
             }
         }
         leave_mutex(m, acquired);
@@ -1353,11 +1138,12 @@ typedef struct {
     int streams_updated;
 } ngn_update_ctx;
 
-static int ngn_update_window(void *ctx, h2_io *io)
+static int ngn_update_window(void *ctx, void *val)
 {
     ngn_update_ctx *uctx = ctx;
-    if (io && io->task && io->task->assigned == uctx->ngn
-        && io_out_consumed_signal(uctx->m, io)) {
+    h2_task *task = val;
+    if (task && task->assigned == uctx->ngn
+        && output_consumed_signal(uctx->m, task)) {
         ++uctx->streams_updated;
     }
     return 1;
@@ -1370,7 +1156,7 @@ static apr_status_t ngn_out_update_windo
     ctx.m = m;
     ctx.ngn = ngn;
     ctx.streams_updated = 0;
-    h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+    h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
     
     return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
 }
@@ -1392,8 +1178,7 @@ apr_status_t h2_mplx_req_engine_push(con
     task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
-        if (!io || io->orphaned) {
+        if (task->orphaned) {
             status = APR_ECONNABORTED;
         }
         else {

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Thu Apr 28 12:43:02 2016
@@ -37,21 +37,21 @@
 struct apr_pool_t;
 struct apr_thread_mutex_t;
 struct apr_thread_cond_t;
+struct h2_bucket_beam;
 struct h2_config;
 struct h2_ihash_t;
+struct h2_ilist_t;
 struct h2_response;
 struct h2_task;
 struct h2_stream;
 struct h2_request;
-struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
-struct h2_int_queue;
+struct h2_iqueue;
 struct h2_ngn_shed;
 struct h2_req_engine;
 
 #include <apr_queue.h>
-#include "h2_io.h"
 
 typedef struct h2_mplx h2_mplx;
 
@@ -72,10 +72,12 @@ struct h2_mplx {
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
 
-    struct h2_int_queue *q;
-    struct h2_io_set *stream_ios;
-    struct h2_io_set *ready_ios;
-    struct h2_io_set *redo_ios;
+    struct h2_ihash_t *streams;     /* all streams currently processing */
+    struct h2_iqueue *q;            /* all stream ids that need to be started */
+    
+    struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
+    struct h2_ihash_t *ready_tasks; /* all tasks ready for submit */
+    struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
     
     apr_uint32_t max_streams;        /* max # of concurrent streams */
     apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -96,10 +98,11 @@ struct h2_mplx {
     apr_size_t stream_max_mem;
     apr_interval_time_t stream_timeout;
     
+    apr_pool_t *spare_io_pool;
     apr_array_header_t *spare_slaves; /* spare slave connections */
     
     struct h2_workers *workers;
-    apr_size_t tx_handles_reserved;
+    int tx_handles_reserved;
     apr_size_t tx_chunk_size;
     
     h2_mplx_consumed_cb *input_consumed;
@@ -166,10 +169,6 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
  */
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
-/* Return != 0 iff the multiplexer has output data for the given stream. 
- */
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -190,8 +189,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, 
-                             const struct h2_request *r, 
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
                              h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
@@ -219,37 +217,11 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
  ******************************************************************************/
 
 /**
- * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
- * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
- * when the end of the stream input has been reached.
- * The condition passed in will be used for blocking/signalling and will
- * be protected by the mplx's own mutex.
- */
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
-                             int stream_id, apr_bucket_brigade *bb,
-                             apr_table_t *trailers, 
-                             struct apr_thread_cond_t *iowait);
-
-/**
- * Appends data to the input of the given stream. Storage of input data is
- * not subject to flow control.
- */
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              const char *data, apr_size_t len, int eos);
-
-/**
- * Closes the input for the given stream_id.
- */
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-
-/**
  * Invoke the consumed callback for all streams that had bytes read since the 
  * last call to this function. If no stream had input data consumed, the 
  * callback is not invoked.
  * The consumed callback may also be invoked at other times whenever
  * the need arises.
- * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
- * happened.
  */
 apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
 
@@ -267,43 +239,10 @@ struct h2_stream *h2_mplx_next_submit(h2
                                       struct h2_ihash_t *streams);
 
 /**
- * Reads output data into the given brigade. Will never block, but
- * return APR_EAGAIN until data arrives or the stream is closed.
- */
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id, 
-                                     apr_bucket_brigade *bb, 
-                                     apr_off_t len, apr_table_t **ptrailers);
-
-/**
  * Opens the output for the given stream with the specified response.
  */
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
-                              struct h2_response *response,
-                              ap_filter_t* filter, apr_bucket_brigade *bb,
-                              struct apr_thread_cond_t *iowait);
-
-/**
- * Append the brigade to the stream output. Might block if amount
- * of bytes buffered reaches configured max.
- * @param stream_id the stream identifier
- * @param filter the apache filter context of the data
- * @param blocking == 0 iff call should return with APR_INCOMPLETE if
- *                 the full brigade cannot be written at once
- * @param bb the bucket brigade to append
- * @param iowait a conditional used for block/signalling in h2_mplx
- */
-apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, 
-                               ap_filter_t* filter, 
-                               int blocking,
-                               apr_bucket_brigade *bb,
-                               struct apr_thread_cond_t *iowait);
-
-/**
- * Closes the output for stream stream_id. 
- */
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
+                              struct h2_response *response);
 
 /*******************************************************************************
  * h2_mplx list Manipulation.

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c Thu Apr 28 12:43:02 2016
@@ -29,16 +29,15 @@
 #include "mod_http2.h"
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_config.h"
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_int_queue.h"
 #include "h2_mplx.h"
 #include "h2_response.h"
 #include "h2_request.h"
 #include "h2_task.h"
-#include "h2_task_output.h"
 #include "h2_util.h"
 #include "h2_ngn_shed.h"
 
@@ -296,7 +295,8 @@ static apr_status_t ngn_done_task(h2_ngn
     ngn->no_finished++;
     if (waslive) ngn->no_live--;
     ngn->no_assigned--;
-
+    task->assigned = NULL;
+    
     return APR_SUCCESS;
 }
                                 

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.c Thu Apr 28 12:43:02 2016
@@ -235,7 +235,8 @@ apr_status_t h2_request_end_headers(h2_r
     const char *s;
     
     if (req->eoh) {
-        return APR_EINVAL;
+        /* already done */
+        return APR_SUCCESS;
     }
 
     /* rfc7540, ch. 8.1.2.3:
@@ -337,37 +338,18 @@ apr_status_t h2_request_add_trailer(h2_r
     return add_h1_trailer(req, pool, name, nlen, value, vlen);
 }
 
-#define OPT_COPY(p, s)  ((s)? apr_pstrdup(p, s) : NULL)
-
-void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src)
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
 {
-    /* keep the dst id */
-    dst->initiated_on   = src->initiated_on;
-    dst->method         = OPT_COPY(p, src->method);
-    dst->scheme         = OPT_COPY(p, src->scheme);
-    dst->authority      = OPT_COPY(p, src->authority);
-    dst->path           = OPT_COPY(p, src->path);
-    dst->headers        = apr_table_clone(p, src->headers);
+    h2_request *dst = apr_pmemdup(p, src, sizeof(*dst));
+    dst->method       = apr_pstrdup(p, src->method);
+    dst->scheme       = apr_pstrdup(p, src->scheme);
+    dst->authority    = apr_pstrdup(p, src->authority);
+    dst->path         = apr_pstrdup(p, src->path);
+    dst->headers      = apr_table_clone(p, src->headers);
     if (src->trailers) {
-        dst->trailers   = apr_table_clone(p, src->trailers);
-    }
-    else {
-        dst->trailers   = NULL;
+        dst->trailers = apr_table_clone(p, src->trailers);
     }
-    dst->content_length = src->content_length;
-    dst->chunked        = src->chunked;
-    dst->eoh            = src->eoh;
-    dst->body           = src->body;
-    dst->serialize      = src->serialize;
-    dst->push_policy    = src->push_policy;
-}
-
-h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
-{
-    h2_request *nreq = apr_pcalloc(p, sizeof(*nreq));
-    memcpy(nreq, src, sizeof(*nreq));
-    h2_request_copy(p, nreq, src);
-    return nreq;
+    return dst;
 }
 
 request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.h Thu Apr 28 12:43:02 2016
@@ -43,8 +43,6 @@ apr_status_t h2_request_add_trailer(h2_r
 apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, 
                                     int eos, int push);
 
-void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src);
-
 h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
 
 /**

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.c Thu Apr 28 12:43:02 2016
@@ -28,6 +28,7 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_bucket_eoc.h"
 #include "h2_bucket_eos.h"
 #include "h2_config.h"
@@ -112,7 +113,7 @@ static void cleanup_streams(h2_session *
     while (1) {
         h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
         if (ctx.candidate) {
-            h2_session_stream_destroy(session, ctx.candidate);
+            h2_session_stream_done(session, ctx.candidate);
             ctx.candidate = NULL;
         }
         else {
@@ -121,7 +122,8 @@ static void cleanup_streams(h2_session *
     }
 }
 
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+                                  int initiated_on, const h2_request *req)
 {
     h2_stream * stream;
     apr_pool_t *stream_pool;
@@ -135,7 +137,8 @@ h2_stream *h2_session_open_stream(h2_ses
         apr_pool_tag(stream_pool, "h2_stream");
     }
     
-    stream = h2_stream_open(stream_id, stream_pool, session);
+    stream = h2_stream_open(stream_id, stream_pool, session, 
+                            initiated_on, req);
     
     h2_ihash_add(session->streams, stream);
     if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
@@ -309,8 +312,9 @@ static apr_status_t stream_release(h2_se
                                    h2_stream *stream,
                                    uint32_t error_code) 
 {
+    conn_rec *c = session->c;
     if (!error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): handled, closing", 
                       session->id, (int)stream->id);
         if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
@@ -320,7 +324,7 @@ static apr_status_t stream_release(h2_se
         }
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
                       "h2_stream(%ld-%d): closing with err=%d %s", 
                       session->id, (int)stream->id, (int)error_code,
                       h2_h2_err_description(error_code));
@@ -328,8 +332,7 @@ static apr_status_t stream_release(h2_se
     }
     
     return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(session->c->bucket_alloc, 
-                                                  stream));
+                             h2_bucket_eos_create(c->bucket_alloc, stream), 0);
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -360,7 +363,7 @@ static int on_begin_headers_cb(nghttp2_s
         /* nop */
     }
     else {
-        s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+        s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
     }
     return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
 }
@@ -615,7 +618,7 @@ static int on_send_data_cb(nghttp2_sessi
         if (status == APR_SUCCESS && padlen) {
             b = apr_bucket_immortal_create(immortal_zeros, padlen, 
                                            session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b);
+            status = h2_conn_io_writeb(&session->io, b, 0);
         }
     }
     
@@ -1030,7 +1033,7 @@ static apr_status_t h2_session_start(h2_
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        stream = h2_session_open_stream(session, 1);
+        stream = h2_session_open_stream(session, 1, 0, NULL);
         if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1134,7 +1137,7 @@ static int resume_on_data(void *ctx, voi
 static int h2_session_resume_streams_with_data(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
-    if (!h2_ihash_is_empty(session->streams)
+    if (!h2_ihash_empty(session->streams)
         && session->mplx && !session->mplx->aborted) {
         resume_ctx ctx;
         
@@ -1275,8 +1278,9 @@ static apr_status_t submit_response(h2_s
         const h2_priority *prio;
         
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
-                      "h2_stream(%ld-%d): submit response %d",
-                      session->id, stream->id, response->http_status);
+                      "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+                      session->id, stream->id, response->http_status,
+                      (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
         
         if (response->content_length != 0) {
             memset(&provider, 0, sizeof(provider));
@@ -1372,9 +1376,8 @@ struct h2_stream *h2_session_push(h2_ses
                   session->id, is->id, nid,
                   push->req->method, push->req->path, is->id);
                   
-    stream = h2_session_open_stream(session, nid);
+    stream = h2_session_open_stream(session, nid, is->id, push->req);
     if (stream) {
-        h2_stream_set_h2_request(stream, is->id, push->req);
         status = stream_schedule(session, stream, 1);
         if (status != APR_SUCCESS) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
@@ -1503,22 +1506,21 @@ apr_status_t h2_session_set_prio(h2_sess
     return status;
 }
 
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
     apr_pool_t *pool = h2_stream_detach_pool(stream);
-
+    int stream_id = stream->id;
+    int rst_error = stream->rst_error;
+    
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                   "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
-                  session->id, stream->id);
-    /* this may be called while the session has already freed
-     * some internal structures or even when the mplx is locked. */
-    if (session->mplx) {
-        h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
-    }
-    
+                  session->id, stream_id);
     if (session->streams) {
-        h2_ihash_remove(session->streams, stream->id);
+        h2_ihash_remove(session->streams, stream_id);
     }
+    
+    h2_stream_cleanup(stream);
+    h2_mplx_stream_done(session->mplx, stream_id, rst_error);
     h2_stream_destroy(stream);
     
     if (pool) {
@@ -1528,6 +1530,7 @@ apr_status_t h2_session_stream_destroy(h
         }
         session->spare = pool;
     }
+
     return APR_SUCCESS;
 }
 
@@ -1757,12 +1760,51 @@ static int is_accepting_streams(h2_sessi
     }
 }
 
+static void update_child_status(h2_session *session, int status, const char *msg)
+{
+    /* Assume that we also change code/msg when something really happened and
+     * avoid updating the scoreboard in between */
+    if (session->last_status_code != status 
+        || session->last_status_msg != msg) {
+        apr_snprintf(session->status, sizeof(session->status),
+                     "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
+                     msg? msg : "-",
+                     (int)h2_ihash_count(session->streams), 
+                     (int)session->remote.emitted_count,
+                     (int)session->responses_submitted,
+                     (int)session->pushes_submitted,
+                     (int)session->pushes_reset + session->streams_reset);
+        ap_update_child_status_descr(session->c->sbh, status, session->status);
+    }
+}
+
 static void transit(h2_session *session, const char *action, h2_session_state nstate)
 {
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
-                  "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
-                  state_name(session->state), action, state_name(nstate));
-    session->state = nstate;
+    if (session->state != nstate) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
+                      "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+                      state_name(session->state), action, state_name(nstate));
+        session->state = nstate;
+        switch (session->state) {
+            case H2_SESSION_ST_IDLE:
+                update_child_status(session, (h2_ihash_empty(session->streams)? 
+                                              SERVER_BUSY_KEEPALIVE
+                                              : SERVER_BUSY_READ), "idle");
+                break;
+            case H2_SESSION_ST_REMOTE_SHUTDOWN:
+                update_child_status(session, SERVER_CLOSING, "remote goaway");
+                break;
+            case H2_SESSION_ST_LOCAL_SHUTDOWN:
+                update_child_status(session, SERVER_CLOSING, "local goaway");
+                break;
+            case H2_SESSION_ST_DONE:
+                update_child_status(session, SERVER_CLOSING, "done");
+                break;
+            default:
+                /* nop */
+                break;
+        }
+    }
 }
 
 static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
@@ -1771,7 +1813,6 @@ static void h2_session_ev_init(h2_sessio
         case H2_SESSION_ST_INIT:
             transit(session, "init", H2_SESSION_ST_BUSY);
             break;
-
         default:
             /* nop */
             break;
@@ -1878,7 +1919,7 @@ static void h2_session_ev_no_io(h2_sessi
             if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                 dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
             }
-            if (h2_ihash_is_empty(session->streams)) {
+            if (h2_ihash_empty(session->streams)) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -2037,29 +2078,11 @@ static void dispatch_event(h2_session *s
 
 static const int MAX_WAIT_MICROS = 200 * 1000;
 
-static void update_child_status(h2_session *session, int status, const char *msg)
-{
-    /* Assume that we also change code/msg when something really happened and
-     * avoid updating the scoreboard in between */
-    if (session->last_status_code != status 
-        || session->last_status_msg != msg) {
-        apr_snprintf(session->status, sizeof(session->status),
-                     "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", 
-                     msg? msg : "-",
-                     (int)h2_ihash_count(session->streams), 
-                     (int)session->remote.emitted_count,
-                     (int)session->responses_submitted,
-                     (int)session->pushes_submitted,
-                     (int)session->pushes_reset + session->streams_reset);
-        ap_update_child_status_descr(session->c->sbh, status, session->status);
-    }
-}
-
 apr_status_t h2_session_process(h2_session *session, int async)
 {
     apr_status_t status = APR_SUCCESS;
     conn_rec *c = session->c;
-    int rv, have_written, have_read, mpm_state, no_streams;
+    int rv, have_written, have_read, mpm_state;
 
     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                   "h2_session(%ld): process start, async=%d", session->id, async);
@@ -2102,12 +2125,10 @@ apr_status_t h2_session_process(h2_sessi
                 break;
                 
             case H2_SESSION_ST_IDLE:
-                no_streams = h2_ihash_is_empty(session->streams);
-                update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
-                                              : SERVER_BUSY_READ), "idle");
                 /* make certain, the client receives everything before we idle */
                 if (!session->keep_sync_until 
-                    && async && no_streams && !session->r && session->remote.emitted_count) {
+                    && async && h2_ihash_empty(session->streams)
+                    && !session->r && session->remote.emitted_count) {
                     ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
                                   "h2_session(%ld): async idle, nonblock read", session->id);
                     /* We do not return to the async mpm immediately, since under
@@ -2190,7 +2211,6 @@ apr_status_t h2_session_process(h2_sessi
                     status = h2_session_read(session, 0);
                     if (status == APR_SUCCESS) {
                         have_read = 1;
-                        update_child_status(session, SERVER_BUSY_READ, "busy");
                         dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
                     }
                     else if (status == APR_EAGAIN) {
@@ -2205,7 +2225,7 @@ apr_status_t h2_session_process(h2_sessi
                     }
                 }
                 
-                if (!h2_ihash_is_empty(session->streams)) {
+                if (!h2_ihash_empty(session->streams)) {
                     /* resume any streams for which data is available again */
                     h2_session_resume_streams_with_data(session);
                     /* Submit any responses/push_promises that are ready */
@@ -2220,9 +2240,10 @@ apr_status_t h2_session_process(h2_sessi
                     }
                     /* send out window updates for our inputs */
                     status = h2_mplx_in_update_windows(session->mplx);
-                    if (status != APR_SUCCESS && status != APR_EAGAIN) {
+                    if (status != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
-                                         H2_ERR_INTERNAL_ERROR, "window update error");
+                                       H2_ERR_INTERNAL_ERROR, 
+                                       "window update error");
                         break;
                     }
                 }
@@ -2257,7 +2278,6 @@ apr_status_t h2_session_process(h2_sessi
                     if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
                     }
-                    update_child_status(session, SERVER_BUSY_READ, "wait");
                 }
                 else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
                     /* waited long enough */
@@ -2297,7 +2317,6 @@ apr_status_t h2_session_process(h2_sessi
                 break;
                 
             case H2_SESSION_ST_DONE:
-                update_child_status(session, SERVER_CLOSING, "done");
                 status = APR_EOF;
                 goto out;
                 

Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.h Thu Apr 28 12:43:02 2016
@@ -198,9 +198,14 @@ struct h2_stream *h2_session_get_stream(
  * 
  * @param session the session to register in
  * @param stream_id the new stream identifier
+ * @param initiated_on the stream id this one is initiated on or 0
+ * @param req the request for this stream or NULL if not known yet
  * @return the new stream
  */
-struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id);
+struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+                                         int initiated_on, 
+                                         const h2_request *req);
+
 
 /**
  * Returns if client settings have push enabled.
@@ -213,8 +218,8 @@ int h2_session_push_enabled(h2_session *
  * @param session the session to which the stream belongs
  * @param stream the stream to destroy
  */
-apr_status_t h2_session_stream_destroy(h2_session *session, 
-                                       struct h2_stream *stream);
+apr_status_t h2_session_stream_done(h2_session *session, 
+                                    struct h2_stream *stream);
 
 /**
  * Submit a push promise on the stream and schedule the new steam for