You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/04/28 14:43:02 UTC
svn commit: r1741419 [2/4] - in /httpd/httpd/branches/2.4.x: ./
modules/http2/
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Thu Apr 28 12:43:02 2016
@@ -29,38 +29,46 @@
#include "mod_http2.h"
#include "h2_private.h"
+#include "h2_bucket_beam.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_int_queue.h"
-#include "h2_io.h"
-#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_task.h"
-#include "h2_task_input.h"
-#include "h2_task_output.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
-#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
- } while(0)
-
-#define H2_MPLX_IO_IN(lvl,m,io,msg) \
- do { \
- if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
- h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \
- } while(0)
+static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
+ conn_rec *c, int level)
+{
+ if (beam && APLOG_C_IS_LEVEL(c,level)) {
+ char buffer[2048];
+ apr_size_t off = 0;
+
+ off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
+ off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
+ off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
+
+ ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
+ c->id, id, msg, buffer);
+ }
+}
+/* utility for iterating over ihash task sets */
+typedef struct {
+ h2_mplx *m;
+ h2_task *task;
+ apr_time_t now;
+} task_iter_ctx;
/* NULL or the mutex hold by this thread, used for recursive calls
*/
@@ -104,13 +112,57 @@ static void leave_mutex(h2_mplx *m, int
}
}
-static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
+static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
{
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- *pstatus = APR_ECONNABORTED;
+ leave_mutex(ctx, 1);
+}
+
+static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
+{
+ h2_mplx *m = ctx;
+ int acquired;
+ apr_status_t status;
+
+ status = enter_mutex(m, &acquired);
+ if (status == APR_SUCCESS) {
+ pbl->mutex = m->lock;
+ pbl->leave = acquired? beam_leave : NULL;
+ pbl->leave_ctx = m;
+ }
+ return status;
+}
+
+static void stream_output_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_task *task = ctx;
+ if (length > 0 && task && task->assigned) {
+ h2_req_engine_out_consumed(task->assigned, task->c, length);
+ }
+}
+
+static void stream_input_consumed(void *ctx,
+ h2_bucket_beam *beam, apr_off_t length)
+{
+ h2_mplx *m = ctx;
+ if (m->input_consumed && length) {
+ m->input_consumed(m->input_consumed_ctx, beam->id, length);
+ }
+}
+
+static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
+{
+ h2_mplx *m = ctx;
+ if (m->tx_handles_reserved > 0) {
+ --m->tx_handles_reserved;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): beaming file %s, tx_avail %d",
+ m->id, beam->id, beam->tag, m->tx_handles_reserved);
return 1;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): can_beam_file denied on %s",
+ m->id, beam->id, beam->tag);
return 0;
}
@@ -118,9 +170,9 @@ static void have_out_data_for(h2_mplx *m
static void check_tx_reservation(h2_mplx *m)
{
- if (m->tx_handles_reserved == 0) {
+ if (m->tx_handles_reserved <= 0) {
m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
- H2MIN(m->tx_chunk_size, h2_io_set_size(m->stream_ios)));
+ H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
}
}
@@ -131,8 +183,7 @@ static void check_tx_free(h2_mplx *m)
m->tx_handles_reserved = m->tx_chunk_size;
h2_workers_tx_free(m->workers, count);
}
- else if (m->tx_handles_reserved
- && (!m->stream_ios || h2_io_set_is_empty(m->stream_ios))) {
+ else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
h2_workers_tx_free(m->workers, m->tx_handles_reserved);
m->tx_handles_reserved = 0;
}
@@ -142,8 +193,8 @@ static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): destroy, ios=%d",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ "h2_mplx(%ld): destroy, tasks=%d",
+ m->id, (int)h2_ihash_count(m->tasks));
check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
@@ -204,9 +255,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
m->bucket_alloc = apr_bucket_alloc_create(m->pool);
m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+
+ m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->stream_ios = h2_io_set_create(m->pool);
- m->ready_ios = h2_io_set_create(m->pool);
+ m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+ m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
+
m->stream_timeout = stream_timeout;
m->workers = workers;
m->workers_max = workers->max_workers;
@@ -240,75 +294,65 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
return max_stream_started;
}
-static void workers_register(h2_mplx *m)
-{
- /* h2_workers is only a hub for all the h2_worker instances.
- * At the end-of-life of this h2_mplx, we always unregister at
- * the workers. The thing to manage are all the h2_worker instances
- * out there. Those may hold a reference to this h2_mplx and we cannot
- * call them to unregister.
- *
- * Therefore: ref counting for h2_workers in not needed, ref counting
- * for h2_worker using this is critical.
- */
- m->need_registration = 0;
- h2_workers_register(m->workers, m);
-}
-
-static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
+static void input_consumed_signal(h2_mplx *m, h2_task *task)
{
- if (io->input_consumed && m->input_consumed) {
- m->input_consumed(m->input_consumed_ctx,
- io->id, io->input_consumed);
- io->input_consumed = 0;
- return 1;
+ if (task->input.beam && task->worker_started) {
+ h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
}
- return 0;
}
-static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
- if (io->output_consumed && io->task && io->task->assigned) {
- h2_req_engine_out_consumed(io->task->assigned, io->task->c,
- io->output_consumed);
- io->output_consumed = 0;
- return 1;
+ if (task->output.beam && task->worker_started && task->assigned) {
+ h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
}
return 0;
}
-static void io_destroy(h2_mplx *m, h2_io *io, int events)
+
+static void task_destroy(h2_mplx *m, h2_task *task, int events)
{
- int reuse_slave;
+ conn_rec *slave = NULL;
+ int reuse_slave = 0;
+ apr_status_t status;
/* cleanup any buffered input */
- h2_io_in_shutdown(io);
+ status = h2_task_shutdown(task, 0);
+ if (status != APR_SUCCESS){
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO()
+ "h2_task(%s): shutdown", task->id);
+ }
+
if (events) {
/* Process outstanding events before destruction */
- io_in_consumed_signal(m, io);
+ input_consumed_signal(m, task);
}
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
- m->tx_handles_reserved += io->files_handles_owned;
-
- h2_io_set_remove(m->stream_ios, io);
- h2_io_set_remove(m->ready_ios, io);
- if (m->redo_ios) {
- h2_io_set_remove(m->redo_ios, io);
+ if (task->input.beam) {
+ m->tx_handles_reserved +=
+ h2_beam_get_files_beamed(task->input.beam);
}
-
+ if (task->output.beam) {
+ m->tx_handles_reserved +=
+ h2_beam_get_files_beamed(task->output.beam);
+ }
+
+ slave = task->c;
reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
- && !io->rst_error && io->eor);
- if (io->task) {
- conn_rec *slave = io->task->c;
- h2_task_destroy(io->task);
- io->task = NULL;
-
+ && !task->rst_error);
+
+ h2_ihash_remove(m->tasks, task->stream_id);
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ if (m->redo_tasks) {
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ }
+ h2_task_destroy(task);
+
+ if (slave) {
if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
- apr_bucket_delete(io->eor);
- io->eor = NULL;
APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
}
else {
@@ -316,59 +360,67 @@ static void io_destroy(h2_mplx *m, h2_io
h2_slave_destroy(slave, NULL);
}
}
-
- if (io->pool) {
- apr_pool_destroy(io->pool);
- }
-
+
check_tx_free(m);
}
-static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
+static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
{
/* Remove io from ready set, we will never submit it */
- h2_io_set_remove(m->ready_ios, io);
- if (!io->worker_started || io->worker_done) {
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ if (task->worker_done) {
/* already finished or not even started yet */
- h2_iq_remove(m->q, io->id);
- io_destroy(m, io, 1);
+ h2_iq_remove(m->q, task->stream_id);
+ task_destroy(m, task, 0);
return 0;
}
else {
/* cleanup once task is done */
- h2_io_make_orphaned(io, rst_error);
+ task->orphaned = 1;
+ if (task->input.beam) {
+ apr_status_t status;
+ status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_stream(%ld-%d): wait on input shutdown",
+ m->id, task->stream_id);
+ status = h2_beam_shutdown(task->input.beam, APR_BLOCK_READ);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_stream(%ld-%d): input shutdown returned",
+ m->id, task->stream_id);
+ }
+ task->input.beam = NULL;
+ }
+ if (rst_error) {
+ h2_task_rst(task, rst_error);
+ }
return 1;
}
}
-static int stream_done_iter(void *ctx, h2_io *io)
+static int stream_done_iter(void *ctx, void *val)
{
- return io_stream_done((h2_mplx*)ctx, io, 0);
+ return task_stream_done((h2_mplx*)ctx, val, 0);
}
-static int stream_print(void *ctx, h2_io *io)
+static int task_print(void *ctx, void *val)
{
h2_mplx *m = ctx;
- if (io && io->request) {
+ h2_task *task = val;
+ if (task->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
- m->id, io->id,
- io->request->method, io->request->authority, io->request->path,
- io->response? "http" : (io->rst_error? "reset" : "?"),
- io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ "->03198: h2_stream(%s): %s %s %s -> %s %d"
+ "[orph=%d/started=%d/done=%d]",
+ task->id, task->request->method,
+ task->request->authority, task->request->path,
+ task->response? "http" : (task->rst_error? "reset" : "?"),
+ task->response? task->response->http_status : task->rst_error,
+ task->orphaned, task->worker_started,
+ task->worker_done);
}
- else if (io) {
+ else if (task) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
- "->03198: h2_stream(%ld-%d): NULL -> %s %d"
- "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
- m->id, io->id,
- io->response? "http" : (io->rst_error? "reset" : "?"),
- io->response? io->response->http_status : io->rst_error,
- io->orphaned, io->worker_started, io->worker_done,
- io->eos_in, io->eos_out);
+ "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
@@ -392,7 +444,7 @@ apr_status_t h2_mplx_release_and_join(h2
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
+ while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -406,10 +458,14 @@ apr_status_t h2_mplx_release_and_join(h2
for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join, waiting on %d worker to report back",
- m->id, (int)h2_io_set_size(m->stream_ios));
+ "h2_mplx(%ld): release_join, waiting on %d tasks to report back",
+ m->id, (int)h2_ihash_count(m->tasks));
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
+
+ while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
+ /* iterate until all ios have been orphaned or destroyed */
+ }
if (APR_STATUS_IS_TIMEUP(status)) {
if (i > 0) {
/* Oh, oh. Still we wait for assigned workers to report that
@@ -419,11 +475,11 @@ apr_status_t h2_mplx_release_and_join(h2
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
"h2_mplx(%ld): release, waiting for %d seconds now for "
- "%d h2_workers to return, have still %d requests outstanding",
+ "%d h2_workers to return, have still %d tasks outstanding",
m->id, i*wait_secs, m->workers_busy,
- (int)h2_io_set_size(m->stream_ios));
+ (int)h2_ihash_count(m->tasks));
if (i == 1) {
- h2_io_set_iter(m->stream_ios, stream_print, m);
+ h2_ihash_iter(m->tasks, task_print, m);
}
}
h2_mplx_abort(m);
@@ -431,13 +487,9 @@ apr_status_t h2_mplx_release_and_join(h2
}
}
- if (!h2_io_set_is_empty(m->stream_ios)) {
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
- "h2_mplx(%ld): release_join, %d streams still open",
- m->id, (int)h2_io_set_size(m->stream_ios));
- }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): release_join -> destroy", m->id);
+ "h2_mplx(%ld): release_join (%d tasks left) -> destroy",
+ m->id, (int)h2_ihash_count(m->tasks));
leave_mutex(m, acquired);
h2_mplx_destroy(m);
/* all gone */
@@ -468,100 +520,17 @@ apr_status_t h2_mplx_stream_done(h2_mplx
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
+ h2_ihash_remove(m->streams, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
- if (io) {
+ if (task) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream as done.",
+ "h2_mplx(%ld-%d): marking stream task as done.",
m->id, stream_id);
- io_stream_done(m, io, rst_error);
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
-
- h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
- status = h2_io_in_read(io, bb, -1, trailers);
- while (APR_STATUS_IS_EAGAIN(status)
- && !is_aborted(m, &status)
- && block == APR_BLOCK_READ) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): wait on in data (BLOCK_READ)",
- m->id, stream_id);
- status = h2_io_signal_wait(m, io);
- if (status == APR_SUCCESS) {
- status = h2_io_in_read(io, bb, -1, trailers);
- }
- }
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post");
- h2_io_signal_exit(io);
- }
- else {
- status = APR_EOF;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
- status = h2_io_in_write(io, data, len, eos);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = h2_io_in_close(io);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
- h2_io_signal(io, H2_IO_READ);
- io_in_consumed_signal(m, io);
- }
- else {
- status = APR_ECONNABORTED;
+ task_stream_done(m, task, rst_error);
}
leave_mutex(m, acquired);
}
@@ -574,17 +543,10 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
m->input_consumed_ctx = ctx;
}
-typedef struct {
- h2_mplx * m;
- int streams_updated;
-} update_ctx;
-
-static int update_window(void *ctx, h2_io *io)
+static int update_window(void *ctx, void *val)
{
- update_ctx *uctx = (update_ctx*)ctx;
- if (io_in_consumed_signal(uctx->m, io)) {
- ++uctx->streams_updated;
- }
+ h2_mplx *m = ctx;
+ input_consumed_signal(m, val);
return 1;
}
@@ -598,49 +560,22 @@ apr_status_t h2_mplx_in_update_windows(h
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- update_ctx ctx;
-
- ctx.m = m;
- ctx.streams_updated = 0;
-
- status = APR_EAGAIN;
- h2_io_set_iter(m->stream_ios, update_window, &ctx);
+ h2_ihash_iter(m->tasks, update_window, m);
- if (ctx.streams_updated) {
- status = APR_SUCCESS;
- }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_session(%ld): windows updated", m->id);
+ status = APR_SUCCESS;
leave_mutex(m, acquired);
}
return status;
}
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *m, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers)
+static int task_iter_first(void *ctx, void *val)
{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_pre");
-
- status = h2_io_out_get_brigade(io, bb, len);
-
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_get_brigade_post");
- if (status == APR_SUCCESS) {
- h2_io_signal(io, H2_IO_WRITE);
- }
- }
- else {
- status = APR_ECONNABORTED;
- }
- *ptrailers = io->response? io->response->trailers : NULL;
- leave_mutex(m, acquired);
- }
- return status;
+ task_iter_ctx *tctx = ctx;
+ h2_task *task = val;
+ tctx->task = task;
+ return 0;
}
h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
@@ -651,138 +586,89 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_shift(m->ready_ios);
- if (io && !m->aborted) {
- stream = h2_ihash_get(streams, io->id);
- if (stream) {
- io->submitted = 1;
- if (io->rst_error) {
- h2_stream_rst(stream, io->rst_error);
+ task_iter_ctx ctx;
+ ctx.m = m;
+ ctx.task = NULL;
+ h2_ihash_iter(m->ready_tasks, task_iter_first, &ctx);
+
+ if (ctx.task && !m->aborted) {
+ h2_task *task = ctx.task;
+
+ h2_ihash_remove(m->ready_tasks, task->stream_id);
+ stream = h2_ihash_get(streams, task->stream_id);
+ if (stream && task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
}
else {
- AP_DEBUG_ASSERT(io->response);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre");
- h2_stream_set_response(stream, io->response, io->bbout);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post");
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response,
+ task->output.beam);
}
}
- else {
+ else if (task) {
/* We have the io ready, but the stream has gone away, maybe
* reset by the client. Should no longer happen since such
* streams should clear io's from the ready queue.
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
- "h2_mplx(%ld): stream for response %d closed, "
+ "h2_mplx(%s): stream for response closed, "
"resetting io to close request processing",
- m->id, io->id);
- h2_io_make_orphaned(io, H2_ERR_STREAM_CLOSED);
- if (!io->worker_started || io->worker_done) {
- io_destroy(m, io, 1);
+ task->id);
+ task->orphaned = 1;
+ h2_task_rst(task, H2_ERR_STREAM_CLOSED);
+ if (!task->worker_started || task->worker_done) {
+ task_destroy(m, task, 1);
}
else {
/* hang around until the h2_task is done, but
- * shutdown input and send out any events (e.g. window
- * updates) asap. */
- h2_io_in_shutdown(io);
- io_in_consumed_signal(m, io);
+ * shutdown input/output and send out any events asap. */
+ h2_task_shutdown(task, 0);
+ input_consumed_signal(m, task);
}
}
-
- h2_io_signal(io, H2_IO_WRITE);
}
leave_mutex(m, acquired);
}
return stream;
}
-static apr_status_t out_write(h2_mplx *m, h2_io *io,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
- /* We check the memory footprint queued for this stream_id
- * and block if it exceeds our configured limit.
- * We will not split buckets to enforce the limit to the last
- * byte. After all, the bucket is already in memory.
- */
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && !is_aborted(m, &status)) {
-
- status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX,
- &m->tx_handles_reserved);
- io_out_consumed_signal(m, io);
-
- /* Wait for data to drain until there is room again or
- * stream timeout expires */
- h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
- while (status == APR_SUCCESS
- && !APR_BRIGADE_EMPTY(bb)
- && iowait
- && (m->stream_max_mem <= h2_io_out_length(io))
- && !is_aborted(m, &status)) {
- if (!blocking) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): incomplete write",
- m->id, io->id);
- return APR_INCOMPLETE;
- }
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_mplx(%ld-%d): waiting for out drain",
- m->id, io->id);
- }
- status = h2_io_signal_wait(m, io);
- }
- h2_io_signal_exit(io);
+ h2_task *task = h2_ihash_get(m->tasks, stream_id);
+
+ if (!task || task->orphaned) {
+ return APR_ECONNABORTED;
}
- apr_brigade_cleanup(bb);
- return status;
-}
-
-static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
-{
- apr_status_t status = APR_SUCCESS;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%s): open response: %d, rst=%d",
+ task->id, response->http_status, response->rst_error);
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- if (f) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_mplx(%ld-%d): open response: %d, rst=%d",
- m->id, stream_id, response->http_status,
- response->rst_error);
- }
-
- h2_io_set_response(io, response);
- h2_io_set_add(m->ready_ios, io);
- if (response && response->http_status < 300) {
- /* we might see some file buckets in the output, see
- * if we have enough handles reserved. */
- check_tx_reservation(m);
- }
- if (bb) {
- status = out_write(m, io, f, 0, bb, iowait);
- if (status == APR_INCOMPLETE) {
- /* write will have transferred as much data as possible.
- caller has to deal with non-empty brigade */
- status = APR_SUCCESS;
- }
- }
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
+ h2_task_set_response(task, response);
+
+ if (task->output.beam) {
+ h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
+ h2_beam_timeout_set(task->output.beam, m->stream_timeout);
+ h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+ m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
+ h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
+ }
+
+ h2_ihash_add(m->ready_tasks, task);
+ if (response && response->http_status < 300) {
+ /* we might see some file buckets in the output, see
+ * if we have enough handles reserved. */
+ check_tx_reservation(m);
}
+ have_out_data_for(m, stream_id);
return status;
}
-apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
- ap_filter_t* f, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status;
int acquired;
@@ -793,127 +679,44 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
status = APR_ECONNABORTED;
}
else {
- status = out_open(m, stream_id, response, f, bb, iowait);
- if (APLOGctrace1(m->c)) {
- h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
- }
+ status = out_open(m, stream_id, response);
}
leave_mutex(m, acquired);
}
return status;
}
-apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
- ap_filter_t* f, int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait)
+static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
- apr_status_t status;
- int acquired;
+ apr_status_t status = APR_SUCCESS;
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- status = out_write(m, io, f, blocking, bb, iowait);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): write", m->id, io->id);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ if (!task || task->orphaned) {
+ return APR_ECONNABORTED;
}
- return status;
-}
-
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int acquired;
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- if (!io->response && !io->rst_error) {
- /* In case a close comes before a response was created,
- * insert an error one so that our streams can properly
- * reset.
- */
- h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
- io->request, m->pool);
- status = out_open(m, stream_id, r, NULL, NULL, NULL);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
- "h2_mplx(%ld-%d): close, no response, no rst",
- m->id, io->id);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): close with eor=%s",
- m->id, io->id, io->eor? "yes" : "no");
- status = h2_io_out_close(io);
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
- io_out_consumed_signal(m, io);
-
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->rst_error && !io->orphaned) {
- h2_io_rst(io, error);
- if (!io->response) {
- h2_io_set_add(m->ready_ios, io);
- }
- H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
-
- have_out_data_for(m, stream_id);
- h2_io_signal(io, H2_IO_WRITE);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ if (!task->response && !task->rst_error) {
+ /* In case a close comes before a response was created,
+ * insert an error one so that our streams can properly
+ * reset.
+ */
+ h2_response *r = h2_response_die(task->stream_id, APR_EGENERAL,
+ task->request, m->pool);
+ status = out_open(m, task->stream_id, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+ "h2_mplx(%s): close, no response, no rst", task->id);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%s): close", task->id);
+ if (task->output.beam) {
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
+ APLOG_TRACE2);
}
+ output_consumed_signal(m, task);
+ have_out_data_for(m, task->stream_id);
return status;
}
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
-{
- apr_status_t status;
- int has_data = 0;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- if (io && !io->orphaned) {
- has_data = h2_io_out_has_data(io);
- }
- else {
- has_data = 0;
- }
- leave_mutex(m, acquired);
- }
- return has_data;
-}
-
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
@@ -969,22 +772,7 @@ apr_status_t h2_mplx_reprioritize(h2_mpl
return status;
}
-static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
-{
- apr_pool_t *io_pool;
- h2_io *io;
-
- apr_pool_create(&io_pool, m->pool);
- apr_pool_tag(io_pool, "h2_io");
- io = h2_io_create(stream_id, io_pool, m->bucket_alloc, request);
- h2_io_set_add(m->stream_ios, io);
-
- return io;
-}
-
-
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const h2_request *req,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
@@ -997,24 +785,27 @@ apr_status_t h2_mplx_process(h2_mplx *m,
status = APR_ECONNABORTED;
}
else {
- h2_io *io = open_io(m, stream_id, req);
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ h2_ihash_add(m->streams, stream);
- if (!io->request->body) {
- status = h2_io_in_close(io);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
}
-
- m->need_registration = m->need_registration || h2_iq_empty(m->q);
- do_registration = (m->need_registration && m->workers_busy < m->workers_max);
- h2_iq_add(m->q, io->id, cmp, ctx);
-
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process", m->c->id, stream_id);
- H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, stream->request->body);
}
leave_mutex(m, acquired);
}
- if (status == APR_SUCCESS && do_registration) {
- workers_register(m);
+ if (do_registration) {
+ m->need_registration = 0;
+ h2_workers_register(m->workers, m);
}
return status;
}
@@ -1022,21 +813,16 @@ apr_status_t h2_mplx_process(h2_mplx *m,
static h2_task *pop_task(h2_mplx *m)
{
h2_task *task = NULL;
+ h2_stream *stream;
int sid;
- while (!m->aborted && !task
- && (m->workers_busy < m->workers_limit)
- && (sid = h2_iq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io && io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else if (io) {
+ while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
+
+ stream = h2_ihash_get(m->streams, sid);
+ if (stream) {
conn_rec *slave, **pslave;
int new_conn = 0;
-
+
pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
if (pslave) {
slave = *pslave;
@@ -1047,17 +833,27 @@ static h2_task *pop_task(h2_mplx *m)
}
slave->sbh = m->c->sbh;
- io->task = task = h2_task_create(m->id, io->request, slave, m);
+ task = h2_task_create(slave, stream->request, stream->input, m);
+ h2_ihash_add(m->tasks, task);
+
m->c->keepalives++;
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
if (new_conn) {
h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
}
- io->worker_started = 1;
- io->started_at = apr_time_now();
+ task->worker_started = 1;
+ task->started_at = apr_time_now();
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
+
+ if (stream->input) {
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+ }
+
++m->workers_busy;
}
}
@@ -1091,8 +887,6 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
-
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
@@ -1103,26 +897,25 @@ static void task_done(h2_mplx *m, h2_tas
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
+ return;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
- /* clean our references and report request as done. Signal
- * that we want another unless we have been aborted */
- /* TODO: this will keep a worker attached to this h2_mplx as
- * long as it has requests to handle. Might no be fair to
- * other mplx's. Perhaps leave after n requests? */
- h2_mplx_out_close(m, task->stream_id);
+ out_close(m, task);
- if (ngn && io) {
- apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+ if (ngn) {
+ apr_off_t bytes = 0;
+ if (task->output.beam) {
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+ bytes += h2_beam_get_buffered(task->output.beam);
+ }
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
- io->output_consumed = 0;
}
}
@@ -1136,54 +929,50 @@ static void task_done(h2_mplx *m, h2_tas
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
- if (io) {
- apr_time_t now = apr_time_now();
- if (!io->orphaned && m->redo_ios
- && h2_io_set_get(m->redo_ios, io->id)) {
- /* reset and schedule again */
- h2_io_redo(io);
- h2_io_set_remove(m->redo_ios, io);
- h2_iq_add(m->q, io->id, NULL, NULL);
- }
- else {
- io->worker_done = 1;
- io->done_at = now;
+ if (!task->orphaned && m->redo_tasks
+ && h2_ihash_get(m->redo_tasks, task->stream_id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->redo_tasks, task->stream_id);
+ h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ return;
+ }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ if (task->output.beam) {
+ h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%s): request done, %f ms"
+ " elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): request(%d) done, %f ms"
- " elapsed", m->id, io->id,
- (io->done_at - io->started_at) / 1000.0);
- if (io->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (now - m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = now;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
- }
- }
-
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
- else {
- /* hang around until the stream deregisters */
+ }
+
+ if (task->orphaned) {
+ task_destroy(m, task, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- "h2_mplx(%ld): task %s without corresp. h2_io",
- m->id, task->id);
+ /* hang around until the stream deregisters */
}
}
}
@@ -1208,80 +997,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
* h2_mplx DoS protection
******************************************************************************/
-typedef struct {
- h2_mplx *m;
- h2_io *io;
- apr_time_t now;
-} io_iter_ctx;
-
-static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+static int latest_repeatable_unsubmitted_iter(void *data, void *val)
{
- io_iter_ctx *ctx = data;
- if (io->worker_started && !io->worker_done
- && h2_io_is_repeatable(io)
- && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
- /* this io occupies a worker, the response has not been submitted yet,
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done && h2_task_can_redo(task)
+ && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
+ /* this task occupies a worker, the response has not been submitted yet,
* not been cancelled and it is a repeatable request
* -> it can be re-scheduled later */
- if (!ctx->io || ctx->io->started_at < io->started_at) {
+ if (!ctx->task || ctx->task->started_at < task->started_at) {
/* we did not have one or this one was started later */
- ctx->io = io;
+ ctx->task = task;
}
}
return 1;
}
-static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
+static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m)
{
- io_iter_ctx ctx;
+ task_iter_ctx ctx;
ctx.m = m;
- ctx.io = NULL;
- h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
- return ctx.io;
+ ctx.task = NULL;
+ h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
+ return ctx.task;
}
-static int timed_out_busy_iter(void *data, h2_io *io)
+static int timed_out_busy_iter(void *data, void *val)
{
- io_iter_ctx *ctx = data;
- if (io->worker_started && !io->worker_done
- && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+ task_iter_ctx *ctx = data;
+ h2_task *task = val;
+ if (!task->worker_done
+ && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
/* timed out stream occupying a worker, found */
- ctx->io = io;
+ ctx->task = task;
return 0;
}
return 1;
}
-static h2_io *get_timed_out_busy_stream(h2_mplx *m)
+
+static h2_task *get_timed_out_busy_task(h2_mplx *m)
{
- io_iter_ctx ctx;
+ task_iter_ctx ctx;
ctx.m = m;
- ctx.io = NULL;
+ ctx.task = NULL;
ctx.now = apr_time_now();
- h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
- return ctx.io;
+ h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
+ return ctx.task;
}
-static apr_status_t unschedule_slow_ios(h2_mplx *m)
+static apr_status_t unschedule_slow_tasks(h2_mplx *m)
{
- h2_io *io;
+ h2_task *task;
int n;
- if (!m->redo_ios) {
- m->redo_ios = h2_io_set_create(m->pool);
+ if (!m->redo_tasks) {
+ m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
}
/* Try to get rid of streams that occupy workers. Look for safe requests
* that are repeatable. If none found, fail the connection.
*/
- n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
- while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
- h2_io_set_add(m->redo_ios, io);
- h2_io_rst(io, H2_ERR_CANCEL);
+ n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
+ while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
+ h2_task_rst(task, H2_ERR_CANCEL);
+ h2_ihash_add(m->redo_tasks, task);
--n;
}
- if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
- io = get_timed_out_busy_stream(m);
- if (io) {
+ if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
+ task = get_timed_out_busy_task(m);
+ if (task) {
/* Too many busy workers, unable to cancel enough streams
* and with a busy, timed out stream, we tell the client
* to go away... */
@@ -1298,7 +1083,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- apr_size_t scount = h2_io_set_size(m->stream_ios);
+ apr_size_t scount = h2_ihash_count(m->streams);
if (scount > 0 && m->workers_busy) {
/* If we have streams in connection state 'IDLE', meaning
* all streams are ready to sent data out, but lack
@@ -1335,7 +1120,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
}
if (m->workers_busy > m->workers_limit) {
- status = unschedule_slow_ios(m);
+ status = unschedule_slow_tasks(m);
}
}
leave_mutex(m, acquired);
@@ -1353,11 +1138,12 @@ typedef struct {
int streams_updated;
} ngn_update_ctx;
-static int ngn_update_window(void *ctx, h2_io *io)
+static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
- if (io && io->task && io->task->assigned == uctx->ngn
- && io_out_consumed_signal(uctx->m, io)) {
+ h2_task *task = val;
+ if (task && task->assigned == uctx->ngn
+ && output_consumed_signal(uctx->m, task)) {
++uctx->streams_updated;
}
return 1;
@@ -1370,7 +1156,7 @@ static apr_status_t ngn_out_update_windo
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
- h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+ h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
@@ -1392,8 +1178,7 @@ apr_status_t h2_mplx_req_engine_push(con
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
- if (!io || io->orphaned) {
+ if (task->orphaned) {
status = APR_ECONNABORTED;
}
else {
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Thu Apr 28 12:43:02 2016
@@ -37,21 +37,21 @@
struct apr_pool_t;
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
+struct h2_bucket_beam;
struct h2_config;
struct h2_ihash_t;
+struct h2_ilist_t;
struct h2_response;
struct h2_task;
struct h2_stream;
struct h2_request;
-struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
-struct h2_int_queue;
+struct h2_iqueue;
struct h2_ngn_shed;
struct h2_req_engine;
#include <apr_queue.h>
-#include "h2_io.h"
typedef struct h2_mplx h2_mplx;
@@ -72,10 +72,12 @@ struct h2_mplx {
unsigned int aborted : 1;
unsigned int need_registration : 1;
- struct h2_int_queue *q;
- struct h2_io_set *stream_ios;
- struct h2_io_set *ready_ios;
- struct h2_io_set *redo_ios;
+ struct h2_ihash_t *streams; /* all streams currently processing */
+ struct h2_iqueue *q; /* all stream ids that need to be started */
+
+ struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
+ struct h2_ihash_t *ready_tasks; /* all tasks ready for submit */
+ struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
apr_uint32_t max_streams; /* max # of concurrent streams */
apr_uint32_t max_stream_started; /* highest stream id that started processing */
@@ -96,10 +98,11 @@ struct h2_mplx {
apr_size_t stream_max_mem;
apr_interval_time_t stream_timeout;
+ apr_pool_t *spare_io_pool;
apr_array_header_t *spare_slaves; /* spare slave connections */
struct h2_workers *workers;
- apr_size_t tx_handles_reserved;
+ int tx_handles_reserved;
apr_size_t tx_chunk_size;
h2_mplx_consumed_cb *input_consumed;
@@ -166,10 +169,6 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
-/* Return != 0 iff the multiplexer has output data for the given stream.
- */
-int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
-
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
@@ -190,8 +189,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
* @param cmp the stream priority compare function
* @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
- const struct h2_request *r,
+apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx);
/**
@@ -219,37 +217,11 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
******************************************************************************/
/**
- * Reads a buckets for the given stream_id. Will return ARP_EAGAIN when
- * called with APR_NONBLOCK_READ and no data present. Will return APR_EOF
- * when the end of the stream input has been reached.
- * The condition passed in will be used for blocking/signalling and will
- * be protected by the mplx's own mutex.
- */
-apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
- int stream_id, apr_bucket_brigade *bb,
- apr_table_t *trailers,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Appends data to the input of the given stream. Storage of input data is
- * not subject to flow control.
- */
-apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
- const char *data, apr_size_t len, int eos);
-
-/**
- * Closes the input for the given stream_id.
- */
-apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
-
-/**
* Invoke the consumed callback for all streams that had bytes read since the
* last call to this function. If no stream had input data consumed, the
* callback is not invoked.
* The consumed callback may also be invoked at other times whenever
* the need arises.
- * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
- * happened.
*/
apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
@@ -267,43 +239,10 @@ struct h2_stream *h2_mplx_next_submit(h2
struct h2_ihash_t *streams);
/**
- * Reads output data into the given brigade. Will never block, but
- * return APR_EAGAIN until data arrives or the stream is closed.
- */
-apr_status_t h2_mplx_out_get_brigade(h2_mplx *mplx, int stream_id,
- apr_bucket_brigade *bb,
- apr_off_t len, apr_table_t **ptrailers);
-
-/**
* Opens the output for the given stream with the specified response.
*/
apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
- struct h2_response *response,
- ap_filter_t* filter, apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Append the brigade to the stream output. Might block if amount
- * of bytes buffered reaches configured max.
- * @param stream_id the stream identifier
- * @param filter the apache filter context of the data
- * @param blocking == 0 iff call should return with APR_INCOMPLETE if
- * the full brigade cannot be written at once
- * @param bb the bucket brigade to append
- * @param iowait a conditional used for block/signalling in h2_mplx
- */
-apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
- ap_filter_t* filter,
- int blocking,
- apr_bucket_brigade *bb,
- struct apr_thread_cond_t *iowait);
-
-/**
- * Closes the output for stream stream_id.
- */
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
-
-apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
+ struct h2_response *response);
/*******************************************************************************
* h2_mplx list Manipulation.
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_ngn_shed.c Thu Apr 28 12:43:02 2016
@@ -29,16 +29,15 @@
#include "mod_http2.h"
#include "h2_private.h"
+#include "h2.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
-#include "h2_int_queue.h"
#include "h2_mplx.h"
#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
-#include "h2_task_output.h"
#include "h2_util.h"
#include "h2_ngn_shed.h"
@@ -296,7 +295,8 @@ static apr_status_t ngn_done_task(h2_ngn
ngn->no_finished++;
if (waslive) ngn->no_live--;
ngn->no_assigned--;
-
+ task->assigned = NULL;
+
return APR_SUCCESS;
}
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.c Thu Apr 28 12:43:02 2016
@@ -235,7 +235,8 @@ apr_status_t h2_request_end_headers(h2_r
const char *s;
if (req->eoh) {
- return APR_EINVAL;
+ /* already done */
+ return APR_SUCCESS;
}
/* rfc7540, ch. 8.1.2.3:
@@ -337,37 +338,18 @@ apr_status_t h2_request_add_trailer(h2_r
return add_h1_trailer(req, pool, name, nlen, value, vlen);
}
-#define OPT_COPY(p, s) ((s)? apr_pstrdup(p, s) : NULL)
-
-void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src)
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
{
- /* keep the dst id */
- dst->initiated_on = src->initiated_on;
- dst->method = OPT_COPY(p, src->method);
- dst->scheme = OPT_COPY(p, src->scheme);
- dst->authority = OPT_COPY(p, src->authority);
- dst->path = OPT_COPY(p, src->path);
- dst->headers = apr_table_clone(p, src->headers);
+ h2_request *dst = apr_pmemdup(p, src, sizeof(*dst));
+ dst->method = apr_pstrdup(p, src->method);
+ dst->scheme = apr_pstrdup(p, src->scheme);
+ dst->authority = apr_pstrdup(p, src->authority);
+ dst->path = apr_pstrdup(p, src->path);
+ dst->headers = apr_table_clone(p, src->headers);
if (src->trailers) {
- dst->trailers = apr_table_clone(p, src->trailers);
- }
- else {
- dst->trailers = NULL;
+ dst->trailers = apr_table_clone(p, src->trailers);
}
- dst->content_length = src->content_length;
- dst->chunked = src->chunked;
- dst->eoh = src->eoh;
- dst->body = src->body;
- dst->serialize = src->serialize;
- dst->push_policy = src->push_policy;
-}
-
-h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
-{
- h2_request *nreq = apr_pcalloc(p, sizeof(*nreq));
- memcpy(nreq, src, sizeof(*nreq));
- h2_request_copy(p, nreq, src);
- return nreq;
+ return dst;
}
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.h Thu Apr 28 12:43:02 2016
@@ -43,8 +43,6 @@ apr_status_t h2_request_add_trailer(h2_r
apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool,
int eos, int push);
-void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src);
-
h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
/**
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.c?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.c Thu Apr 28 12:43:02 2016
@@ -28,6 +28,7 @@
#include <scoreboard.h>
#include "h2_private.h"
+#include "h2.h"
#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h"
@@ -112,7 +113,7 @@ static void cleanup_streams(h2_session *
while (1) {
h2_ihash_iter(session->streams, find_cleanup_stream, &ctx);
if (ctx.candidate) {
- h2_session_stream_destroy(session, ctx.candidate);
+ h2_session_stream_done(session, ctx.candidate);
ctx.candidate = NULL;
}
else {
@@ -121,7 +122,8 @@ static void cleanup_streams(h2_session *
}
}
-h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+ int initiated_on, const h2_request *req)
{
h2_stream * stream;
apr_pool_t *stream_pool;
@@ -135,7 +137,8 @@ h2_stream *h2_session_open_stream(h2_ses
apr_pool_tag(stream_pool, "h2_stream");
}
- stream = h2_stream_open(stream_id, stream_pool, session);
+ stream = h2_stream_open(stream_id, stream_pool, session,
+ initiated_on, req);
h2_ihash_add(session->streams, stream);
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
@@ -309,8 +312,9 @@ static apr_status_t stream_release(h2_se
h2_stream *stream,
uint32_t error_code)
{
+ conn_rec *c = session->c;
if (!error_code) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
@@ -320,7 +324,7 @@ static apr_status_t stream_release(h2_se
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
h2_h2_err_description(error_code));
@@ -328,8 +332,7 @@ static apr_status_t stream_release(h2_se
}
return h2_conn_io_writeb(&session->io,
- h2_bucket_eos_create(session->c->bucket_alloc,
- stream));
+ h2_bucket_eos_create(c->bucket_alloc, stream), 0);
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -360,7 +363,7 @@ static int on_begin_headers_cb(nghttp2_s
/* nop */
}
else {
- s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
+ s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
}
return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
@@ -615,7 +618,7 @@ static int on_send_data_cb(nghttp2_sessi
if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc);
- status = h2_conn_io_writeb(&session->io, b);
+ status = h2_conn_io_writeb(&session->io, b, 0);
}
}
@@ -1030,7 +1033,7 @@ static apr_status_t h2_session_start(h2_
}
/* Now we need to auto-open stream 1 for the request we got. */
- stream = h2_session_open_stream(session, 1);
+ stream = h2_session_open_stream(session, 1, 0, NULL);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1134,7 +1137,7 @@ static int resume_on_data(void *ctx, voi
static int h2_session_resume_streams_with_data(h2_session *session)
{
AP_DEBUG_ASSERT(session);
- if (!h2_ihash_is_empty(session->streams)
+ if (!h2_ihash_empty(session->streams)
&& session->mplx && !session->mplx->aborted) {
resume_ctx ctx;
@@ -1275,8 +1278,9 @@ static apr_status_t submit_response(h2_s
const h2_priority *prio;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d",
- session->id, stream->id, response->http_status);
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
if (response->content_length != 0) {
memset(&provider, 0, sizeof(provider));
@@ -1372,9 +1376,8 @@ struct h2_stream *h2_session_push(h2_ses
session->id, is->id, nid,
push->req->method, push->req->path, is->id);
- stream = h2_session_open_stream(session, nid);
+ stream = h2_session_open_stream(session, nid, is->id, push->req);
if (stream) {
- h2_stream_set_h2_request(stream, is->id, push->req);
status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
@@ -1503,22 +1506,21 @@ apr_status_t h2_session_set_prio(h2_sess
return status;
}
-apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
-
+ int stream_id = stream->id;
+ int rst_error = stream->rst_error;
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): cleanup by EOS bucket destroy",
- session->id, stream->id);
- /* this may be called while the session has already freed
- * some internal structures or even when the mplx is locked. */
- if (session->mplx) {
- h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
- }
-
+ session->id, stream_id);
if (session->streams) {
- h2_ihash_remove(session->streams, stream->id);
+ h2_ihash_remove(session->streams, stream_id);
}
+
+ h2_stream_cleanup(stream);
+ h2_mplx_stream_done(session->mplx, stream_id, rst_error);
h2_stream_destroy(stream);
if (pool) {
@@ -1528,6 +1530,7 @@ apr_status_t h2_session_stream_destroy(h
}
session->spare = pool;
}
+
return APR_SUCCESS;
}
@@ -1757,12 +1760,51 @@ static int is_accepting_streams(h2_sessi
}
}
+static void update_child_status(h2_session *session, int status, const char *msg)
+{
+ /* Assume that we also change code/msg when something really happened and
+ * avoid updating the scoreboard in between */
+ if (session->last_status_code != status
+ || session->last_status_msg != msg) {
+ apr_snprintf(session->status, sizeof(session->status),
+ "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
+ msg? msg : "-",
+ (int)h2_ihash_count(session->streams),
+ (int)session->remote.emitted_count,
+ (int)session->responses_submitted,
+ (int)session->pushes_submitted,
+ (int)session->pushes_reset + session->streams_reset);
+ ap_update_child_status_descr(session->c->sbh, status, session->status);
+ }
+}
+
static void transit(h2_session *session, const char *action, h2_session_state nstate)
{
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
- "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
- state_name(session->state), action, state_name(nstate));
- session->state = nstate;
+ if (session->state != nstate) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03078)
+ "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id,
+ state_name(session->state), action, state_name(nstate));
+ session->state = nstate;
+ switch (session->state) {
+ case H2_SESSION_ST_IDLE:
+ update_child_status(session, (h2_ihash_empty(session->streams)?
+ SERVER_BUSY_KEEPALIVE
+ : SERVER_BUSY_READ), "idle");
+ break;
+ case H2_SESSION_ST_REMOTE_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "remote goaway");
+ break;
+ case H2_SESSION_ST_LOCAL_SHUTDOWN:
+ update_child_status(session, SERVER_CLOSING, "local goaway");
+ break;
+ case H2_SESSION_ST_DONE:
+ update_child_status(session, SERVER_CLOSING, "done");
+ break;
+ default:
+ /* nop */
+ break;
+ }
+ }
}
static void h2_session_ev_init(h2_session *session, int arg, const char *msg)
@@ -1771,7 +1813,6 @@ static void h2_session_ev_init(h2_sessio
case H2_SESSION_ST_INIT:
transit(session, "init", H2_SESSION_ST_BUSY);
break;
-
default:
/* nop */
break;
@@ -1878,7 +1919,7 @@ static void h2_session_ev_no_io(h2_sessi
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
- if (h2_ihash_is_empty(session->streams)) {
+ if (h2_ihash_empty(session->streams)) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* finished processing existing ones. Time to leave. */
@@ -2037,29 +2078,11 @@ static void dispatch_event(h2_session *s
static const int MAX_WAIT_MICROS = 200 * 1000;
-static void update_child_status(h2_session *session, int status, const char *msg)
-{
- /* Assume that we also change code/msg when something really happened and
- * avoid updating the scoreboard in between */
- if (session->last_status_code != status
- || session->last_status_msg != msg) {
- apr_snprintf(session->status, sizeof(session->status),
- "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
- msg? msg : "-",
- (int)h2_ihash_count(session->streams),
- (int)session->remote.emitted_count,
- (int)session->responses_submitted,
- (int)session->pushes_submitted,
- (int)session->pushes_reset + session->streams_reset);
- ap_update_child_status_descr(session->c->sbh, status, session->status);
- }
-}
-
apr_status_t h2_session_process(h2_session *session, int async)
{
apr_status_t status = APR_SUCCESS;
conn_rec *c = session->c;
- int rv, have_written, have_read, mpm_state, no_streams;
+ int rv, have_written, have_read, mpm_state;
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): process start, async=%d", session->id, async);
@@ -2102,12 +2125,10 @@ apr_status_t h2_session_process(h2_sessi
break;
case H2_SESSION_ST_IDLE:
- no_streams = h2_ihash_is_empty(session->streams);
- update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
- : SERVER_BUSY_READ), "idle");
/* make certain, the client receives everything before we idle */
if (!session->keep_sync_until
- && async && no_streams && !session->r && session->remote.emitted_count) {
+ && async && h2_ihash_empty(session->streams)
+ && !session->r && session->remote.emitted_count) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
/* We do not return to the async mpm immediately, since under
@@ -2190,7 +2211,6 @@ apr_status_t h2_session_process(h2_sessi
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
have_read = 1;
- update_child_status(session, SERVER_BUSY_READ, "busy");
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
@@ -2205,7 +2225,7 @@ apr_status_t h2_session_process(h2_sessi
}
}
- if (!h2_ihash_is_empty(session->streams)) {
+ if (!h2_ihash_empty(session->streams)) {
/* resume any streams for which data is available again */
h2_session_resume_streams_with_data(session);
/* Submit any responses/push_promises that are ready */
@@ -2220,9 +2240,10 @@ apr_status_t h2_session_process(h2_sessi
}
/* send out window updates for our inputs */
status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS && status != APR_EAGAIN) {
+ if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "window update error");
+ H2_ERR_INTERNAL_ERROR,
+ "window update error");
break;
}
}
@@ -2257,7 +2278,6 @@ apr_status_t h2_session_process(h2_sessi
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
- update_child_status(session, SERVER_BUSY_READ, "wait");
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
/* waited long enough */
@@ -2297,7 +2317,6 @@ apr_status_t h2_session_process(h2_sessi
break;
case H2_SESSION_ST_DONE:
- update_child_status(session, SERVER_CLOSING, "done");
status = APR_EOF;
goto out;
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.h?rev=1741419&r1=1741418&r2=1741419&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.h Thu Apr 28 12:43:02 2016
@@ -198,9 +198,14 @@ struct h2_stream *h2_session_get_stream(
*
* @param session the session to register in
* @param stream_id the new stream identifier
+ * @param initiated_on the stream id this one is initiated on or 0
+ * @param req the request for this stream or NULL if not known yet
* @return the new stream
*/
-struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id);
+struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
+ int initiated_on,
+ const h2_request *req);
+
/**
* Returns if client settings have push enabled.
@@ -213,8 +218,8 @@ int h2_session_push_enabled(h2_session *
* @param session the session to which the stream belongs
* @param stream the stream to destroy
*/
-apr_status_t h2_session_stream_destroy(h2_session *session,
- struct h2_stream *stream);
+apr_status_t h2_session_stream_done(h2_session *session,
+ struct h2_stream *stream);
/**
* Submit a push promise on the stream and schedule the new steam for