You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/04/20 17:17:38 UTC

svn commit: r1740155 - in /httpd/httpd/trunk: ./ modules/http2/

Author: icing
Date: Wed Apr 20 15:17:38 2016
New Revision: 1740155

URL: http://svn.apache.org/viewvc?rev=1740155&view=rev
Log:
mod_http2: elimination of h2_io intermediate stream instances

Removed:
    httpd/httpd/trunk/modules/http2/h2_io.c
    httpd/httpd/trunk/modules/http2/h2_io.h
Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/CMakeLists.txt
    httpd/httpd/trunk/modules/http2/NWGNUmod_http2
    httpd/httpd/trunk/modules/http2/config2.m4
    httpd/httpd/trunk/modules/http2/h2.h
    httpd/httpd/trunk/modules/http2/h2_bucket_eoc.c
    httpd/httpd/trunk/modules/http2/h2_bucket_eos.c
    httpd/httpd/trunk/modules/http2/h2_conn.c
    httpd/httpd/trunk/modules/http2/h2_filter.c
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
    httpd/httpd/trunk/modules/http2/h2_proxy_session.c
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_stream.c
    httpd/httpd/trunk/modules/http2/h2_stream.h
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task.h
    httpd/httpd/trunk/modules/http2/h2_util.c
    httpd/httpd/trunk/modules/http2/h2_util.h
    httpd/httpd/trunk/modules/http2/mod_http2.dsp
    httpd/httpd/trunk/modules/http2/mod_proxy_http2.c

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Wed Apr 20 15:17:38 2016
@@ -1,8 +1,11 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: eliminating h2_io instances for streams, reducing memory
+     pools and footprint. [Stefan Eissing]
+     
   *) core: explicitly exclude 'h2' from protocols announced via an Upgrade: 
-     header as commanded by http-wg.
+     header as commanded by http-wg. [Stefan Eissing]
      
   *) mod_http2: new "bucket beam" technology to transport buckets across
      threads without buffer copy. Delaying response start until flush or

Modified: httpd/httpd/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CMakeLists.txt?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/CMakeLists.txt (original)
+++ httpd/httpd/trunk/CMakeLists.txt Wed Apr 20 15:17:38 2016
@@ -403,7 +403,7 @@ SET(mod_http2_extra_sources
   modules/http2/h2_conn.c            modules/http2/h2_conn_io.c
   modules/http2/h2_ctx.c             modules/http2/h2_filter.c
   modules/http2/h2_from_h1.c         modules/http2/h2_h2.c
-  modules/http2/h2_io.c              modules/http2/h2_bucket_beam.c
+  modules/http2/h2_bucket_beam.c
   modules/http2/h2_mplx.c            modules/http2/h2_push.c
   modules/http2/h2_request.c         modules/http2/h2_response.c
   modules/http2/h2_session.c         modules/http2/h2_stream.c 

Modified: httpd/httpd/trunk/modules/http2/NWGNUmod_http2
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/NWGNUmod_http2?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/NWGNUmod_http2 (original)
+++ httpd/httpd/trunk/modules/http2/NWGNUmod_http2 Wed Apr 20 15:17:38 2016
@@ -195,7 +195,6 @@ FILES_nlm_objs = \
 	$(OBJDIR)/h2_filter.o \
 	$(OBJDIR)/h2_from_h1.o \
 	$(OBJDIR)/h2_h2.o \
-	$(OBJDIR)/h2_io.o \
 	$(OBJDIR)/h2_mplx.o \
 	$(OBJDIR)/h2_ngn_shed.o \
 	$(OBJDIR)/h2_push.o \

Modified: httpd/httpd/trunk/modules/http2/config2.m4
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/config2.m4?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/config2.m4 (original)
+++ httpd/httpd/trunk/modules/http2/config2.m4 Wed Apr 20 15:17:38 2016
@@ -30,7 +30,6 @@ h2_ctx.lo dnl
 h2_filter.lo dnl
 h2_from_h1.lo dnl
 h2_h2.lo dnl
-h2_io.lo dnl
 h2_mplx.lo dnl
 h2_ngn_shed.lo dnl
 h2_push.lo dnl

Modified: httpd/httpd/trunk/modules/http2/h2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2.h?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2.h (original)
+++ httpd/httpd/trunk/modules/http2/h2.h Wed Apr 20 15:17:38 2016
@@ -149,6 +149,9 @@ struct h2_response {
     const char  *sos_filter;
 };
 
+typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
+
+typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx);
 
 /* Note key to attach connection task id to conn_rec/request_rec instances */
 

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eoc.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eoc.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eoc.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eoc.c Wed Apr 20 15:17:38 2016
@@ -23,6 +23,7 @@
 #include <http_log.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_mplx.h"
 #include "h2_session.h"
 #include "h2_bucket_eoc.h"

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eos.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eos.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eos.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eos.c Wed Apr 20 15:17:38 2016
@@ -23,6 +23,7 @@
 #include <http_log.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_mplx.h"
 #include "h2_stream.h"
 #include "h2_bucket_eos.h"

Modified: httpd/httpd/trunk/modules/http2/h2_conn.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn.c Wed Apr 20 15:17:38 2016
@@ -26,6 +26,7 @@
 #include <http_request.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_config.h"
 #include "h2_ctx.h"
 #include "h2_filter.h"

Modified: httpd/httpd/trunk/modules/http2/h2_filter.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_filter.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_filter.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_filter.c Wed Apr 20 15:17:38 2016
@@ -22,6 +22,7 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_conn_io.h"
 #include "h2_ctx.h"
 #include "h2_mplx.h"

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Wed Apr 20 15:17:38 2016
@@ -34,7 +34,6 @@
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
-#include "h2_io.h"
 #include "h2_response.h"
 #include "h2_mplx.h"
 #include "h2_ngn_shed.h"
@@ -64,6 +63,13 @@ static void h2_beam_log(h2_bucket_beam *
     }
 }
 
+/* utility for iterating over ihash task sets */
+typedef struct {
+    h2_mplx *m;
+    h2_task *task;
+    apr_time_t now;
+} task_iter_ctx;
+
 /* NULL or the mutex hold by this thread, used for recursive calls
  */
 static apr_threadkey_t *thread_lock;
@@ -123,9 +129,9 @@ static void io_mutex_leave(void *ctx, ap
 static void stream_output_consumed(void *ctx, 
                                    h2_bucket_beam *beam, apr_off_t length)
 {
-    h2_io *io = ctx;
-    if (length > 0 && io->task && io->task->assigned) {
-        h2_req_engine_out_consumed(io->task->assigned, io->task->c, length); 
+    h2_task *task = ctx;
+    if (length > 0 && task && task->assigned) {
+        h2_req_engine_out_consumed(task->assigned, task->c, length); 
     }
 }
 
@@ -160,7 +166,7 @@ static void check_tx_reservation(h2_mplx
 {
     if (m->tx_handles_reserved <= 0) {
         m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
-            H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios)));
+            H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
     }
 }
 
@@ -171,8 +177,7 @@ static void check_tx_free(h2_mplx *m)
         m->tx_handles_reserved = m->tx_chunk_size;
         h2_workers_tx_free(m->workers, count);
     }
-    else if (m->tx_handles_reserved 
-             && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) {
+    else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
         h2_workers_tx_free(m->workers, m->tx_handles_reserved);
         m->tx_handles_reserved = 0;
     }
@@ -182,8 +187,8 @@ static void h2_mplx_destroy(h2_mplx *m)
 {
     AP_DEBUG_ASSERT(m);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld): destroy, ios=%d", 
-                  m->id, (int)h2_ilist_count(m->stream_ios));
+                  "h2_mplx(%ld): destroy, tasks=%d", 
+                  m->id, (int)h2_ihash_count(m->tasks));
     check_tx_free(m);
     if (m->pool) {
         apr_pool_destroy(m->pool);
@@ -244,9 +249,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+
+        m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
         m->q = h2_iq_create(m->pool, m->max_streams);
-        m->stream_ios = h2_ilist_create(m->pool);
-        m->ready_ios = h2_ilist_create(m->pool);
+        m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+        m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+
         m->stream_timeout = stream_timeout;
         m->workers = workers;
         m->workers_max = workers->max_workers;
@@ -280,65 +288,56 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
     return max_stream_started;
 }
 
-static void io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void input_consumed_signal(h2_mplx *m, h2_task *task)
 {
-    if (io->beam_in && io->worker_started) {
-        h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */
+    if (task->input.beam && task->worker_started) {
+        h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
     }
 }
 
-static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
-    if (io->beam_out && io->worker_started && io->task && io->task->assigned) {
-        h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */
+    if (task->output.beam && task->worker_started && task->assigned) {
+        h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
     }
     return 0;
 }
 
 
-static void io_destroy(h2_mplx *m, h2_io *io, int events)
+static void task_destroy(h2_mplx *m, h2_task *task, int events)
 {
     conn_rec *slave = NULL;
-    int reuse_slave;
+    int reuse_slave = 0;
     
     /* cleanup any buffered input */
-    h2_io_shutdown(io);
+    h2_task_shutdown(task);
     if (events) {
         /* Process outstanding events before destruction */
-        io_in_consumed_signal(m, io);
+        input_consumed_signal(m, task);
     }
     
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
      * file handle pool. */
-    if (io->beam_in) {
-        m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in);
-    }
-    if (io->beam_out) {
-        m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out);
+    if (task->input.beam) {
+        m->tx_handles_reserved += 
+        h2_beam_get_files_beamed(task->input.beam);
     }
-
-    h2_ilist_remove(m->stream_ios, io->id);
-    h2_ilist_remove(m->ready_ios, io->id);
-    if (m->redo_ios) {
-        h2_ilist_remove(m->redo_ios, io->id);
+    if (task->output.beam) {
+        m->tx_handles_reserved += 
+        h2_beam_get_files_beamed(task->output.beam);
     }
-
+    
+    slave = task->c;
     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
-                    && !io->rst_error);
-    if (io->task) {
-        slave = io->task->c;
-        h2_task_destroy(io->task);
-        io->task = NULL;
-    }
-
-    if (io->pool) {
-        if (m->spare_io_pool) {
-            apr_pool_destroy(m->spare_io_pool);
-        }
-        apr_pool_clear(io->pool);
-        m->spare_io_pool = io->pool;
+                   && !task->rst_error);
+    
+    h2_ihash_remove(m->tasks, task->stream_id);
+    h2_ihash_remove(m->ready_tasks, task->stream_id);
+    if (m->redo_tasks) {
+        h2_ihash_remove(m->redo_tasks, task->stream_id);
     }
+    h2_task_destroy(task);
 
     if (slave) {
         if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
@@ -353,21 +352,26 @@ static void io_destroy(h2_mplx *m, h2_io
     check_tx_free(m);
 }
 
-static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) 
+static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error) 
 {
     /* Remove io from ready set, we will never submit it */
-    h2_ilist_remove(m->ready_ios, io->id);
-    if (!io->worker_started || io->worker_done) {
+    h2_ihash_remove(m->ready_tasks, task->stream_id);
+    if (task->worker_done) {
         /* already finished or not even started yet */
-        h2_iq_remove(m->q, io->id);
-        io_destroy(m, io, 1);
+        h2_iq_remove(m->q, task->stream_id);
+        task_destroy(m, task, 1);
         return 0;
     }
     else {
         /* cleanup once task is done */
-        io->orphaned = 1;
+        task->orphaned = 1;
+        if (task->input.beam) {
+            /* TODO: this is currently allocated by the stream and will disappear */
+            h2_beam_shutdown(task->input.beam);
+            task->input.beam = NULL;
+        }
         if (rst_error) {
-            h2_io_rst(io, rst_error);
+            h2_task_rst(task, rst_error);
         }
         return 1;
     }
@@ -375,31 +379,27 @@ static int io_stream_done(h2_mplx *m, h2
 
 static int stream_done_iter(void *ctx, void *val)
 {
-    return io_stream_done((h2_mplx*)ctx, val, 0);
+    return task_stream_done((h2_mplx*)ctx, val, 0);
 }
 
-static int stream_print(void *ctx, void *val)
+static int task_print(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
-    h2_io *io = val;
-    if (io && io->request) {
+    h2_task *task = val;
+    if (task->request) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
+                      "->03198: h2_stream(%s): %s %s %s -> %s %d"
                       "[orph=%d/started=%d/done=%d]", 
-                      m->id, io->id, 
-                      io->request->method, io->request->authority, io->request->path,
-                      io->response? "http" : (io->rst_error? "reset" : "?"),
-                      io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done);
+                      task->id, task->request->method, 
+                      task->request->authority, task->request->path,
+                      task->response? "http" : (task->rst_error? "reset" : "?"),
+                      task->response? task->response->http_status : task->rst_error,
+                      task->orphaned, task->worker_started, 
+                      task->worker_done);
     }
-    else if (io) {
+    else if (task) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
-                      "->03198: h2_stream(%ld-%d): NULL -> %s %d"
-                      "[orph=%d/started=%d/done=%d]", 
-                      m->id, io->id, 
-                      io->response? "http" : (io->rst_error? "reset" : "?"),
-                      io->response? io->response->http_status : io->rst_error,
-                      io->orphaned, io->worker_started, io->worker_done);
+                      "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -423,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2
         
         h2_iq_clear(m->q);
         apr_thread_cond_broadcast(m->task_thawed);
-        while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+        while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
     
@@ -437,12 +437,12 @@ apr_status_t h2_mplx_release_and_join(h2
         for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
-                          m->id, (int)h2_ilist_count(m->stream_ios));
+                          "h2_mplx(%ld): release_join, waiting on %d tasks to report back", 
+                          m->id, (int)h2_ihash_count(m->tasks));
                           
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             
-            while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) {
+            while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
                 /* iterate until all ios have been orphaned or destroyed */
             }
             if (APR_STATUS_IS_TIMEUP(status)) {
@@ -454,11 +454,11 @@ apr_status_t h2_mplx_release_and_join(h2
                      */
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "%d h2_workers to return, have still %d requests outstanding", 
+                                  "%d h2_workers to return, have still %d tasks outstanding", 
                                   m->id, i*wait_secs, m->workers_busy,
-                                  (int)h2_ilist_count(m->stream_ios));
+                                  (int)h2_ihash_count(m->tasks));
                     if (i == 1) {
-                        h2_ilist_iter(m->stream_ios, stream_print, m);
+                        h2_ihash_iter(m->tasks, task_print, m);
                     }
                 }
                 h2_mplx_abort(m);
@@ -466,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2
             }
         }
         
-        if (!h2_ilist_empty(m->stream_ios)) {
+        if (!h2_ihash_empty(m->tasks)) {
             ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, 
-                          "h2_mplx(%ld): release_join, %d streams still open", 
-                          m->id, (int)h2_ilist_count(m->stream_ios));
+                          "h2_mplx(%ld): release_join, %d tasks still open", 
+                          m->id, (int)h2_ihash_count(m->tasks));
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                       "h2_mplx(%ld): release_join -> destroy", m->id);
@@ -503,16 +503,17 @@ apr_status_t h2_mplx_stream_done(h2_mplx
      */
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
+        h2_task *task = h2_ihash_get(m->tasks, stream_id);
 
+        h2_ihash_remove(m->streams, stream_id);
         /* there should be an h2_io, once the stream has been scheduled
          * for processing, e.g. when we received all HEADERs. But when
          * a stream is cancelled very early, it will not exist. */
-        if (io) {
+        if (task) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
-                          "h2_mplx(%ld-%d): marking stream as done.", 
+                          "h2_mplx(%ld-%d): marking stream task as done.", 
                           m->id, stream_id);
-            io_stream_done(m, io, rst_error);
+            task_stream_done(m, task, rst_error);
         }
         leave_mutex(m, acquired);
     }
@@ -528,7 +529,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
 static int update_window(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
-    io_in_consumed_signal(m, val);
+    input_consumed_signal(m, val);
     return 1;
 }
 
@@ -542,7 +543,7 @@ apr_status_t h2_mplx_in_update_windows(h
         return APR_ECONNABORTED;
     }
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_ilist_iter(m->stream_ios, update_window, m);
+        h2_ihash_iter(m->tasks, update_window, m);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_session(%ld): windows updated", m->id);
@@ -552,6 +553,14 @@ apr_status_t h2_mplx_in_update_windows(h
     return status;
 }
 
+static int task_iter_first(void *ctx, void *val)
+{
+    task_iter_ctx *tctx = ctx;
+    h2_task *task = val;
+    tctx->task = task;
+    return 0;
+}
+
 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
 {
     apr_status_t status;
@@ -560,38 +569,46 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
 
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_ilist_shift(m->ready_ios);
-        if (io && !m->aborted) {
-            stream = h2_ihash_get(streams, io->id);
-            if (stream) {
-                io->submitted = 1;
-                if (io->rst_error) {
-                    h2_stream_rst(stream, io->rst_error);
+        task_iter_ctx ctx;
+        ctx.m = m;
+        ctx.task = NULL;
+        h2_ihash_iter(m->ready_tasks, task_iter_first, &ctx);
+        
+        if (ctx.task && !m->aborted) {
+            h2_task *task = ctx.task;
+            
+            h2_ihash_remove(m->ready_tasks, task->stream_id);
+            stream = h2_ihash_get(streams, task->stream_id);
+            if (stream && task) {
+                task->submitted = 1;
+                if (task->rst_error) {
+                    h2_stream_rst(stream, task->rst_error);
                 }
                 else {
-                    AP_DEBUG_ASSERT(io->response);
-                    h2_stream_set_response(stream, io->response, io->beam_out);
+                    AP_DEBUG_ASSERT(task->response);
+                    h2_stream_set_response(stream, task->response, 
+                                           task->output.beam);
                 }
             }
-            else {
+            else if (task) {
                 /* We have the io ready, but the stream has gone away, maybe
                  * reset by the client. Should no longer happen since such
                  * streams should clear io's from the ready queue.
                  */
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
-                              "h2_mplx(%ld): stream for response %d closed, "
+                              "h2_mplx(%s): stream for response closed, "
                               "resetting io to close request processing",
-                              m->id, io->id);
-                io->orphaned = 1;
-                h2_io_rst(io, H2_ERR_STREAM_CLOSED);
-                if (!io->worker_started || io->worker_done) {
-                    io_destroy(m, io, 1);
+                              task->id);
+                task->orphaned = 1;
+                h2_task_rst(task, H2_ERR_STREAM_CLOSED);
+                if (!task->worker_started || task->worker_done) {
+                    task_destroy(m, task, 1);
                 }
                 else {
                     /* hang around until the h2_task is done, but
                      * shutdown input/output and send out any events asap. */
-                    h2_io_shutdown(io);
-                    io_in_consumed_signal(m, io);
+                    h2_task_shutdown(task);
+                    input_consumed_signal(m, task);
                 }
             }
         }
@@ -600,33 +617,32 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
     return stream;
 }
 
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
-                             h2_bucket_beam *output)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status = APR_SUCCESS;
+    h2_task *task = h2_ihash_get(m->tasks, stream_id);
     
-    h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
-    if (!io || io->orphaned) {
+    if (!task || task->orphaned) {
         return APR_ECONNABORTED;
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                  "h2_mplx(%ld-%d): open response: %d, rst=%d",
-                  m->id, stream_id, response->http_status, 
-                  response->rst_error);
-    
-    if (output) {
-        h2_beam_buffer_size_set(output, m->stream_max_mem);
-        h2_beam_timeout_set(output, m->stream_timeout);
-        h2_beam_on_consumed(output, stream_output_consumed, io);
-        m->tx_handles_reserved -= h2_beam_get_files_beamed(output);
-        h2_beam_on_file_beam(output, can_beam_file, m);
-        h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave, 
-                          io->task->cond, m);
+                  "h2_mplx(%s): open response: %d, rst=%d",
+                  task->id, response->http_status, response->rst_error);
+    
+    h2_task_set_response(task, response);
+    
+    if (task->output.beam) {
+        h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
+        h2_beam_timeout_set(task->output.beam, m->stream_timeout);
+        h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+        m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
+        h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
+        h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave, 
+                          task->cond, m);
     }
-    h2_io_set_response(io, response, output);
     
-    h2_ilist_add(m->ready_ios, io);
+    h2_ihash_add(m->ready_tasks, task);
     if (response && response->http_status < 300) {
         /* we might see some file buckets in the output, see
          * if we have enough handles reserved. */
@@ -636,8 +652,7 @@ static apr_status_t out_open(h2_mplx *m,
     return status;
 }
 
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
-                              h2_bucket_beam *output)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
 {
     apr_status_t status;
     int acquired;
@@ -648,7 +663,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
             status = APR_ECONNABORTED;
         }
         else {
-            status = out_open(m, stream_id, response, output);
+            status = out_open(m, stream_id, response);
         }
         leave_mutex(m, acquired);
     }
@@ -662,28 +677,28 @@ apr_status_t h2_mplx_out_close(h2_mplx *
     
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_ilist_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            if (!io->response && !io->rst_error) {
+        h2_task *task = h2_ihash_get(m->tasks, stream_id);
+        if (task && !task->orphaned) {
+            if (!task->response && !task->rst_error) {
                 /* In case a close comes before a response was created,
                  * insert an error one so that our streams can properly
                  * reset.
                  */
                 h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
-                                                 io->request, m->pool);
-                status = out_open(m, stream_id, r, NULL);
+                                                 task->request, m->pool);
+                status = out_open(m, stream_id, r);
                 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
                               "h2_mplx(%ld-%d): close, no response, no rst", 
-                              m->id, io->id);
+                              m->id, stream_id);
             }
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): close", m->id, io->id);
-            if (io->beam_out) {
-                status = h2_beam_close(io->beam_out);
-                h2_beam_log(io->beam_out, stream_id, "out_close", m->c, 
+                          "h2_mplx(%ld-%d): close", m->id, stream_id);
+            if (task->output.beam) {
+                status = h2_beam_close(task->output.beam);
+                h2_beam_log(task->output.beam, stream_id, "out_close", m->c, 
                             APLOG_TRACE2);
             }
-            io_out_consumed_signal(m, io);
+            output_consumed_signal(m, task);
             have_out_data_for(m, stream_id);
         }
         else {
@@ -762,8 +777,9 @@ apr_status_t h2_mplx_process(h2_mplx *m,
             status = APR_ECONNABORTED;
         }
         else {
-            apr_pool_t *io_pool;
-            h2_io *io;
+            h2_beam_create(&stream->input, stream->pool, stream->id, 
+                           "input", 0);
+            h2_ihash_add(m->streams, stream);
             
             if (!m->need_registration) {
                 m->need_registration = h2_iq_empty(m->q);
@@ -771,23 +787,11 @@ apr_status_t h2_mplx_process(h2_mplx *m,
             if (m->workers_busy < m->workers_max) {
                 do_registration = m->need_registration;
             }
+            h2_iq_add(m->q, stream->id, cmp, ctx);
 
-            io_pool = m->spare_io_pool;
-            if (io_pool) {
-                m->spare_io_pool = NULL;
-            }
-            else {
-                apr_pool_create(&io_pool, m->pool);
-                apr_pool_tag(io_pool, "h2_io");
-            }
-            io = h2_io_create(stream->id, io_pool, stream->request);
-            h2_ilist_add(m->stream_ios, io);            
-            h2_iq_add(m->q, io->id, cmp, ctx);
-            
-            stream->input = io->beam_in;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
                           "h2_mplx(%ld-%d): process, body=%d", 
-                          m->c->id, stream->id, io->request->body);
+                          m->c->id, stream->id, stream->request->body);
         }
         leave_mutex(m, acquired);
     }
@@ -801,24 +805,15 @@ apr_status_t h2_mplx_process(h2_mplx *m,
 static h2_task *pop_task(h2_mplx *m)
 {
     h2_task *task = NULL;
-    h2_io *io;
+    h2_stream *stream;
     int sid;
     while (!m->aborted && !task  && (m->workers_busy < m->workers_limit)
            && (sid = h2_iq_shift(m->q)) > 0) {
         
-        io = h2_ilist_get(m->stream_ios, sid);
-        if (io) {
+        stream = h2_ihash_get(m->streams, sid);
+        if (stream) {
             conn_rec *slave, **pslave;
-            
-            if (io->orphaned) {
-                /* TODO: add to purge list */
-                io_destroy(m, io, 0);
-                if (m->join_wait) {
-                    apr_thread_cond_signal(m->join_wait);
-                }
-                continue;
-            }
-            
+
             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
             if (pslave) {
                 slave = *pslave;
@@ -829,19 +824,20 @@ static h2_task *pop_task(h2_mplx *m)
             }
             
             slave->sbh = m->c->sbh;
-            io->task = task = h2_task_create(slave, io->request, 
-                                             io->beam_in, m);
+            task = h2_task_create(slave, stream->request, stream->input, m);
+            h2_ihash_add(m->tasks, task);
+            
             m->c->keepalives++;
             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
             
-            io->worker_started = 1;
-            io->started_at = apr_time_now();
+            task->worker_started = 1;
+            task->started_at = apr_time_now();
             
-            if (io->beam_in) {
-                h2_beam_timeout_set(io->beam_in, m->stream_timeout);
-                h2_beam_on_consumed(io->beam_in, stream_input_consumed, m);
-                h2_beam_on_file_beam(io->beam_in, can_beam_file, m);
-                h2_beam_mutex_set(io->beam_in, io_mutex_enter, 
+            if (task->input.beam) {
+                h2_beam_timeout_set(task->input.beam, m->stream_timeout);
+                h2_beam_on_consumed(task->input.beam, stream_input_consumed, m);
+                h2_beam_on_file_beam(task->input.beam, can_beam_file, m);
+                h2_beam_mutex_set(task->input.beam, io_mutex_enter, 
                                   io_mutex_leave, task->cond, m);
             }
             if (sid > m->max_stream_started) {
@@ -880,8 +876,6 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
 {
     if (task) {
-        h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
-        
         if (task->frozen) {
             /* this task was handed over to an engine for processing 
              * and the original worker has finished. That means the 
@@ -894,6 +888,8 @@ static void task_done(h2_mplx *m, h2_tas
             apr_thread_cond_broadcast(m->task_thawed);
         }
         else {
+            apr_time_t now = apr_time_now();
+            
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): task(%s) done", m->id, task->id);
             /* clean our references and report request as done. Signal
@@ -903,11 +899,11 @@ static void task_done(h2_mplx *m, h2_tas
              * other mplx's. Perhaps leave after n requests? */
             h2_mplx_out_close(m, task->stream_id);
             
-            if (ngn && io) {
+            if (ngn) {
                 apr_off_t bytes = 0;
-                if (io->beam_out) {
-                    h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ);
-                    bytes += h2_beam_get_buffered(io->beam_out);
+                if (task->output.beam) {
+                    h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+                    bytes += h2_beam_get_buffered(task->output.beam);
                 }
                 if (bytes > 0) {
                     /* we need to report consumed and current buffered output
@@ -928,55 +924,47 @@ static void task_done(h2_mplx *m, h2_tas
                 h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
             }
             
-            if (io) {
-                apr_time_t now = apr_time_now();
-                if (!io->orphaned && m->redo_ios
-                    && h2_ilist_get(m->redo_ios, io->id)) {
-                    /* reset and schedule again */
-                    h2_io_redo(io);
-                    h2_ilist_remove(m->redo_ios, io->id);
-                    h2_iq_add(m->q, io->id, NULL, NULL);
-                }
-                else {
-                    io->worker_done = 1;
-                    io->done_at = now;
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                  "h2_mplx(%ld): request(%d) done, %f ms"
-                                  " elapsed", m->id, io->id, 
-                                  (io->done_at - io->started_at) / 1000.0);
-                    if (io->started_at > m->last_idle_block) {
-                        /* this task finished without causing an 'idle block', e.g.
-                         * a block by flow control.
-                         */
-                        if (now - m->last_limit_change >= m->limit_change_interval
-                            && m->workers_limit < m->workers_max) {
-                            /* Well behaving stream, allow it more workers */
-                            m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                                     m->workers_max);
-                            m->last_limit_change = now;
-                            m->need_registration = 1;
-                            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                          "h2_mplx(%ld): increase worker limit to %d",
-                                          m->id, m->workers_limit);
-                        }
-                    }
-                }
-                
-                if (io->orphaned) {
-                    /* TODO: add to purge list */
-                    io_destroy(m, io, 0);
-                    if (m->join_wait) {
-                        apr_thread_cond_signal(m->join_wait);
+            if (!task->orphaned && m->redo_tasks
+                && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+                /* reset and schedule again */
+                h2_task_redo(task);
+                h2_ihash_remove(m->redo_tasks, task->stream_id);
+                h2_iq_add(m->q, task->stream_id, NULL, NULL);
+            }
+            else {
+                task->worker_done = 1;
+                task->done_at = now;
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                              "h2_mplx(%s): request done, %f ms"
+                              " elapsed", task->id, 
+                              (task->done_at - task->started_at) / 1000.0);
+                if (task->started_at > m->last_idle_block) {
+                    /* this task finished without causing an 'idle block', e.g.
+                     * a block by flow control.
+                     */
+                    if (now - m->last_limit_change >= m->limit_change_interval
+                        && m->workers_limit < m->workers_max) {
+                        /* Well behaving stream, allow it more workers */
+                        m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                                 m->workers_max);
+                        m->last_limit_change = now;
+                        m->need_registration = 1;
+                        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                                      "h2_mplx(%ld): increase worker limit to %d",
+                                      m->id, m->workers_limit);
                     }
                 }
-                else {
-                    /* hang around until the stream deregisters */
+            }
+            
+            if (task->orphaned) {
+                /* TODO: add to purge list */
+                task_destroy(m, task, 0);
+                if (m->join_wait) {
+                    apr_thread_cond_signal(m->join_wait);
                 }
             }
             else {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              "h2_mplx(%ld): task %s without corresp. h2_io",
-                              m->id, task->id);
+                /* hang around until the stream deregisters */
             }
         }
     }
@@ -1001,81 +989,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
  * h2_mplx DoS protection
  ******************************************************************************/
 
-typedef struct {
-    h2_mplx *m;
-    h2_io *io;
-    apr_time_t now;
-} io_iter_ctx;
-
-static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val)
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
 {
-    io_iter_ctx *ctx = data;
-    h2_io *io = val;
-    if (io->worker_started && !io->worker_done
-        && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) {
-        /* this io occupies a worker, the response has not been submitted yet,
+    task_iter_ctx *ctx = data;
+    h2_task *task = val;
+    if (!task->worker_done && h2_task_can_redo(task) 
+        && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
+        /* this task occupies a worker, the response has not been submitted yet,
          * not been cancelled and it is a repeatable request
          * -> it can be re-scheduled later */
-        if (!ctx->io || ctx->io->started_at < io->started_at) {
+        if (!ctx->task || ctx->task->started_at < task->started_at) {
             /* we did not have one or this one was started later */
-            ctx->io = io;
+            ctx->task = task;
         }
     }
     return 1;
 }
 
-static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) 
+static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) 
 {
-    io_iter_ctx ctx;
+    task_iter_ctx ctx;
     ctx.m = m;
-    ctx.io = NULL;
-    h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
-    return ctx.io;
+    ctx.task = NULL;
+    h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
+    return ctx.task;
 }
 
 static int timed_out_busy_iter(void *data, void *val)
 {
-    io_iter_ctx *ctx = data;
-    h2_io *io = val;
-    if (io->worker_started && !io->worker_done
-        && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+    task_iter_ctx *ctx = data;
+    h2_task *task = val;
+    if (!task->worker_done
+        && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
         /* timed out stream occupying a worker, found */
-        ctx->io = io;
+        ctx->task = task;
         return 0;
     }
     return 1;
 }
-static h2_io *get_timed_out_busy_stream(h2_mplx *m) 
+
+static h2_task *get_timed_out_busy_task(h2_mplx *m) 
 {
-    io_iter_ctx ctx;
+    task_iter_ctx ctx;
     ctx.m = m;
-    ctx.io = NULL;
+    ctx.task = NULL;
     ctx.now = apr_time_now();
-    h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx);
-    return ctx.io;
+    h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
+    return ctx.task;
 }
 
-static apr_status_t unschedule_slow_ios(h2_mplx *m) 
+static apr_status_t unschedule_slow_tasks(h2_mplx *m) 
 {
-    h2_io *io;
+    h2_task *task;
     int n;
     
-    if (!m->redo_ios) {
-        m->redo_ios = h2_ilist_create(m->pool);
+    if (!m->redo_tasks) {
+        m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
     }
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios));
-    while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
-        h2_ilist_add(m->redo_ios, io);
-        h2_io_rst(io, H2_ERR_CANCEL);
+    n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+    while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
+        h2_task_rst(task, H2_ERR_CANCEL);
+        h2_ihash_add(m->redo_tasks, task);
         --n;
     }
     
-    if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) {
-        io = get_timed_out_busy_stream(m);
-        if (io) {
+    if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
+        task = get_timed_out_busy_task(m);
+        if (task) {
             /* Too many busy workers, unable to cancel enough streams
              * and with a busy, timed out stream, we tell the client
              * to go away... */
@@ -1092,7 +1075,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        apr_size_t scount = h2_ilist_count(m->stream_ios);
+        apr_size_t scount = h2_ihash_count(m->streams);
         if (scount > 0 && m->workers_busy) {
             /* If we have streams in connection state 'IDLE', meaning
              * all streams are ready to sent data out, but lack
@@ -1129,7 +1112,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
             }
             
             if (m->workers_busy > m->workers_limit) {
-                status = unschedule_slow_ios(m);
+                status = unschedule_slow_tasks(m);
             }
         }
         leave_mutex(m, acquired);
@@ -1150,9 +1133,9 @@ typedef struct {
 static int ngn_update_window(void *ctx, void *val)
 {
     ngn_update_ctx *uctx = ctx;
-    h2_io *io = val;
-    if (io && io->task && io->task->assigned == uctx->ngn
-        && io_out_consumed_signal(uctx->m, io)) {
+    h2_task *task = val;
+    if (task && task->assigned == uctx->ngn
+        && output_consumed_signal(uctx->m, task)) {
         ++uctx->streams_updated;
     }
     return 1;
@@ -1165,7 +1148,7 @@ static apr_status_t ngn_out_update_windo
     ctx.m = m;
     ctx.ngn = ngn;
     ctx.streams_updated = 0;
-    h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx);
+    h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
     
     return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
 }
@@ -1187,8 +1170,7 @@ apr_status_t h2_mplx_req_engine_push(con
     task->r = r;
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id);
-        if (!io || io->orphaned) {
+        if (task->orphaned) {
             status = APR_ECONNABORTED;
         }
         else {

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Wed Apr 20 15:17:38 2016
@@ -52,7 +52,6 @@ struct h2_ngn_shed;
 struct h2_req_engine;
 
 #include <apr_queue.h>
-#include "h2_io.h"
 
 typedef struct h2_mplx h2_mplx;
 
@@ -73,10 +72,12 @@ struct h2_mplx {
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
 
-    struct h2_iqueue *q;
-    struct h2_ilist_t *stream_ios;
-    struct h2_ilist_t *ready_ios;
-    struct h2_ilist_t *redo_ios;
+    struct h2_ihash_t *streams;     /* all streams currently processing */
+    struct h2_iqueue *q;            /* all stream ids that need to be started */
+    
+    struct h2_ihash_t *tasks;       /* all tasks started and not destroyed */
+    struct h2_ihash_t *ready_tasks; /* all tasks ready for submit */
+    struct h2_ihash_t *redo_tasks;  /* all tasks that need to be redone */
     
     apr_uint32_t max_streams;        /* max # of concurrent streams */
     apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -241,8 +242,7 @@ struct h2_stream *h2_mplx_next_submit(h2
  * Opens the output for the given stream with the specified response.
  */
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
-                              struct h2_response *response,
-                              struct h2_bucket_beam *output);
+                              struct h2_response *response);
 
 /**
  * Closes the output for stream stream_id. 

Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.c Wed Apr 20 15:17:38 2016
@@ -29,6 +29,7 @@
 #include "mod_http2.h"
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_config.h"
 #include "h2_conn.h"
 #include "h2_ctx.h"

Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Wed Apr 20 15:17:38 2016
@@ -878,7 +878,7 @@ static void ev_init(h2_proxy_session *se
 {
     switch (session->state) {
         case H2_PROXYS_ST_INIT:
-            if (h2_ihash_is_empty(session->streams)) {
+            if (h2_ihash_empty(session->streams)) {
                 transit(session, "init", H2_PROXYS_ST_IDLE);
             }
             else {
@@ -985,7 +985,7 @@ static void ev_no_io(h2_proxy_session *s
              * CPU cycles. Ideally, we'd like to do a blocking read, but that
              * is not possible if we have scheduled tasks and wait
              * for them to produce something. */
-            if (h2_ihash_is_empty(session->streams)) {
+            if (h2_ihash_empty(session->streams)) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -1289,7 +1289,7 @@ static int done_iter(void *udata, void *
 void h2_proxy_session_cleanup(h2_proxy_session *session, 
                               h2_proxy_request_done *done)
 {
-    if (session->streams && !h2_ihash_is_empty(session->streams)) {
+    if (session->streams && !h2_ihash_empty(session->streams)) {
         cleanup_iter_ctx ctx;
         ctx.session = session;
         ctx.done = done;
@@ -1328,7 +1328,7 @@ static int win_update_iter(void *udata,
 void h2_proxy_session_update_window(h2_proxy_session *session, 
                                     conn_rec *c, apr_off_t bytes)
 {
-    if (session->streams && !h2_ihash_is_empty(session->streams)) {
+    if (session->streams && !h2_ihash_empty(session->streams)) {
         win_update_ctx ctx;
         ctx.session = session;
         ctx.c = c;

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Wed Apr 20 15:17:38 2016
@@ -28,6 +28,7 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_bucket_eoc.h"
 #include "h2_bucket_eos.h"
 #include "h2_config.h"
@@ -1136,7 +1137,7 @@ static int resume_on_data(void *ctx, voi
 static int h2_session_resume_streams_with_data(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
-    if (!h2_ihash_is_empty(session->streams)
+    if (!h2_ihash_empty(session->streams)
         && session->mplx && !session->mplx->aborted) {
         resume_ctx ctx;
         
@@ -1519,11 +1520,7 @@ apr_status_t h2_session_stream_done(h2_s
     }
     
     h2_stream_cleanup(stream);
-    /* this may be called while the session has already freed
-     * some internal structures or even when the mplx is locked. */
-    if (session->mplx) {
-        h2_mplx_stream_done(session->mplx, stream_id, rst_error);
-    }
+    h2_mplx_stream_done(session->mplx, stream_id, rst_error);
     h2_stream_destroy(stream);
     
     if (pool) {
@@ -1884,7 +1881,7 @@ static void h2_session_ev_no_io(h2_sessi
             if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
                 dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
             }
-            if (h2_ihash_is_empty(session->streams)) {
+            if (h2_ihash_empty(session->streams)) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -2107,7 +2104,7 @@ apr_status_t h2_session_process(h2_sessi
                 break;
                 
             case H2_SESSION_ST_IDLE:
-                no_streams = h2_ihash_is_empty(session->streams);
+                no_streams = h2_ihash_empty(session->streams);
                 update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
                                               : SERVER_BUSY_READ), "idle");
                 /* make certain, the client receives everything before we idle */
@@ -2210,7 +2207,7 @@ apr_status_t h2_session_process(h2_sessi
                     }
                 }
                 
-                if (!h2_ihash_is_empty(session->streams)) {
+                if (!h2_ihash_empty(session->streams)) {
                     /* resume any streams for which data is available again */
                     h2_session_resume_streams_with_data(session);
                     /* Submit any responses/push_promises that are ready */

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Wed Apr 20 15:17:38 2016
@@ -24,6 +24,7 @@
 #include <nghttp2/nghttp2.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_bucket_beam.h"
 #include "h2_conn.h"
 #include "h2_config.h"
@@ -174,6 +175,10 @@ h2_stream *h2_stream_open(int id, apr_po
 void h2_stream_cleanup(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
+    if (stream->input) {
+        h2_beam_destroy(stream->input);
+        stream->input = NULL;
+    }
     if (stream->buffer) {
         apr_brigade_cleanup(stream->buffer);
     }

Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Wed Apr 20 15:17:38 2016
@@ -30,7 +30,6 @@
  * The h2_response gives the HEADER frames to sent to the client, followed
  * by DATA frames read from the h2_stream until EOS is reached.
  */
-#include "h2_io.h"
 
 struct h2_mplx;
 struct h2_priority;

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Wed Apr 20 15:17:38 2016
@@ -33,6 +33,7 @@
 #include <scoreboard.h>
 
 #include "h2_private.h"
+#include "h2.h"
 #include "h2_bucket_beam.h"
 #include "h2_conn.h"
 #include "h2_config.h"
@@ -272,8 +273,7 @@ static apr_status_t open_response(h2_tas
                   task->id, task->request->method, 
                   task->request->authority, 
                   task->request->path);
-    return h2_mplx_out_open(task->mplx, task->stream_id, 
-                            response, task->output.beam);
+    return h2_mplx_out_open(task->mplx, task->stream_id, response);
 }
 
 static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb)
@@ -441,6 +441,63 @@ static apr_status_t h2_filter_read_respo
 }
 
 /*******************************************************************************
+ * task things
+ ******************************************************************************/
+ 
+void h2_task_set_response(h2_task *task, h2_response *response) 
+{
+    AP_DEBUG_ASSERT(response);
+    AP_DEBUG_ASSERT(!task->response);
+    /* we used to clone the response into out own pool. But
+     * we have much tighter control over the EOR bucket nowadays,
+     * so just use the instance given */
+    task->response = response;
+    if (response->rst_error) {
+        h2_task_rst(task, response->rst_error);
+    }
+}
+
+
+int h2_task_can_redo(h2_task *task) {
+    if (task->submitted
+        || (task->input.beam && h2_beam_was_received(task->input.beam)) 
+        || !task->request) {
+        /* cannot repeat that. */
+        return 0;
+    }
+    return (!strcmp("GET", task->request->method)
+            || !strcmp("HEAD", task->request->method)
+            || !strcmp("OPTIONS", task->request->method));
+}
+
+void h2_task_redo(h2_task *task)
+{
+    task->response = NULL;
+    task->rst_error = 0;
+}
+
+void h2_task_rst(h2_task *task, int error)
+{
+    task->rst_error = error;
+    if (task->input.beam) {
+        h2_beam_abort(task->input.beam);
+    }
+    if (task->output.beam) {
+        h2_beam_abort(task->output.beam);
+    }
+}
+
+void h2_task_shutdown(h2_task *task)
+{
+    if (task->input.beam) {
+        h2_beam_shutdown(task->input.beam);
+    }
+    if (task->output.beam) {
+        h2_beam_shutdown(task->output.beam);
+    }
+}
+
+/*******************************************************************************
  * Register various hooks
  */
 static int h2_task_process_conn(conn_rec* c);
@@ -517,6 +574,10 @@ void h2_task_destroy(h2_task *task)
 {
     ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1");
     ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2");
+    if (task->output.beam) {
+        h2_beam_destroy(task->output.beam);
+        task->output.beam = NULL;
+    }
     if (task->eor) {
         apr_bucket_destroy(task->eor);
     }

Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Wed Apr 20 15:17:38 2016
@@ -44,7 +44,7 @@ struct h2_mplx;
 struct h2_task;
 struct h2_req_engine;
 struct h2_request;
-struct h2_resp_head;
+struct h2_response;
 struct h2_worker;
 
 typedef struct h2_task h2_task;
@@ -53,17 +53,10 @@ struct h2_task {
     const char *id;
     int stream_id;
     conn_rec *c;
-    struct h2_mplx *mplx;
     apr_pool_t *pool;
-    const struct h2_request *request;
-    apr_bucket *eor;
-    struct apr_thread_cond_t *cond;
     
-    unsigned int filters_set : 1;
-    unsigned int ser_headers : 1;
-    unsigned int frozen      : 1;
-    unsigned int blocking    : 1;
-    unsigned int detached    : 1;
+    const struct h2_request *request;
+    struct h2_response *response;
     
     struct {
         struct h2_bucket_beam *beam;
@@ -81,6 +74,24 @@ struct h2_task {
         apr_bucket_brigade *bb;
     } output;
     
+    struct h2_mplx *mplx;
+    struct apr_thread_cond_t *cond;
+    
+    int rst_error;                   /* h2 related stream abort error */
+    unsigned int filters_set    : 1;
+    unsigned int ser_headers    : 1;
+    unsigned int frozen         : 1;
+    unsigned int blocking       : 1;
+    unsigned int detached       : 1;
+    unsigned int orphaned       : 1; /* h2_stream is gone for this task */    
+    unsigned int submitted      : 1; /* response has been submitted to client */
+    unsigned int worker_started : 1; /* h2_worker started processing for this io */
+    unsigned int worker_done    : 1; /* h2_worker finished for this io */
+    
+    apr_time_t started_at;           /* when processing started */
+    apr_time_t done_at;              /* when processing was done */
+    apr_bucket *eor;
+    
     struct h2_req_engine *engine;   /* engine hosted by this task */
     struct h2_req_engine *assigned; /* engine that task has been assigned to */
     request_rec *r;                 /* request being processed in this task */
@@ -93,6 +104,21 @@ void h2_task_destroy(h2_task *task);
 
 apr_status_t h2_task_do(h2_task *task);
 
+void h2_task_set_response(h2_task *task, struct h2_response *response);
+
+void h2_task_redo(h2_task *task);
+int h2_task_can_redo(h2_task *task);
+
+/**
+ * Reset the task with the given error code, resets all input/output.
+ */
+void h2_task_rst(h2_task *task, int error);
+
+/**
+ * Shuts all input/output down. Clears any buckets buffered and closes.
+ */
+void h2_task_shutdown(h2_task *task);
+
 void h2_task_register_hooks(void);
 /*
  * One time, post config intialization.

Modified: httpd/httpd/trunk/modules/http2/h2_util.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.c Wed Apr 20 15:17:38 2016
@@ -286,7 +286,7 @@ size_t h2_ihash_count(h2_ihash_t *ih)
     return apr_hash_count(ih->hash);
 }
 
-int h2_ihash_is_empty(h2_ihash_t *ih)
+int h2_ihash_empty(h2_ihash_t *ih)
 {
     return apr_hash_count(ih->hash) == 0;
 }

Modified: httpd/httpd/trunk/modules/http2/h2_util.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.h?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.h Wed Apr 20 15:17:38 2016
@@ -49,7 +49,7 @@ typedef int h2_ihash_iter_t(void *ctx, v
 h2_ihash_t *h2_ihash_create(apr_pool_t *pool, size_t offset_of_int);
 
 size_t h2_ihash_count(h2_ihash_t *ih);
-int h2_ihash_is_empty(h2_ihash_t *ih);
+int h2_ihash_empty(h2_ihash_t *ih);
 void *h2_ihash_get(h2_ihash_t *ih, int id);
 
 /**

Modified: httpd/httpd/trunk/modules/http2/mod_http2.dsp
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.dsp?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.dsp (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.dsp Wed Apr 20 15:17:38 2016
@@ -145,10 +145,6 @@ SOURCE=./h2_h2.c
 # End Source File
 # Begin Source File
 
-SOURCE=./h2_io.c
-# End Source File
-# Begin Source File
-
 SOURCE=./h2_mplx.c
 # End Source File
 # Begin Source File

Modified: httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_proxy_http2.c?rev=1740155&r1=1740154&r2=1740155&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_proxy_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_proxy_http2.c Wed Apr 20 15:17:38 2016
@@ -377,7 +377,7 @@ static apr_status_t proxy_engine_run(h2_
                 status = s2;
                 break;
             }
-            if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) {
+            if (!ctx->next && h2_ihash_empty(ctx->session->streams)) {
                 break;
             }
         }