You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/03/02 12:21:29 UTC
svn commit: r1733259 [2/4] - in /httpd/httpd/branches/2.4.x: ./
modules/http2/
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.c Wed Mar 2 11:21:28 2016
@@ -33,23 +33,45 @@
#include "h2_task.h"
#include "h2_util.h"
-h2_io *h2_io_create(int id, apr_pool_t *pool)
+h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
{
h2_io *io = apr_pcalloc(pool, sizeof(*io));
if (io) {
io->id = id;
io->pool = pool;
io->bucket_alloc = apr_bucket_alloc_create(pool);
+ io->request = h2_request_clone(pool, request);
}
return io;
}
-void h2_io_destroy(h2_io *io)
+void h2_io_redo(h2_io *io)
{
- if (io->pool) {
- apr_pool_destroy(io->pool);
- /* gone */
+ io->worker_started = 0;
+ io->response = NULL;
+ io->rst_error = 0;
+ if (io->bbin) {
+ apr_brigade_cleanup(io->bbin);
+ }
+ if (io->bbout) {
+ apr_brigade_cleanup(io->bbout);
+ }
+ if (io->tmp) {
+ apr_brigade_cleanup(io->tmp);
+ }
+ io->started_at = io->done_at = 0;
+}
+
+int h2_io_is_repeatable(h2_io *io) {
+ if (io->submitted
+ || io->input_consumed > 0
+ || !io->request) {
+ /* cannot repeat that. */
+ return 0;
}
+ return (!strcmp("GET", io->request->method)
+ || !strcmp("HEAD", io->request->method)
+ || !strcmp("OPTIONS", io->request->method));
}
void h2_io_set_response(h2_io *io, h2_response *response)
@@ -75,6 +97,11 @@ int h2_io_in_has_eos_for(h2_io *io)
return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
}
+int h2_io_in_has_data(h2_io *io)
+{
+ return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
+}
+
int h2_io_out_has_data(h2_io *io)
{
return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -102,12 +129,13 @@ apr_status_t h2_io_in_shutdown(h2_io *io
}
-void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs, apr_thread_cond_t *cond)
+void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
+ apr_thread_cond_t *cond)
{
io->timed_op = op;
io->timed_cond = cond;
- if (timeout_secs > 0) {
- io->timeout_at = apr_time_now() + apr_time_from_sec(timeout_secs);
+ if (timeout > 0) {
+ io->timeout_at = apr_time_now() + timeout;
}
else {
io->timeout_at = 0;
@@ -255,6 +283,18 @@ apr_status_t h2_io_in_read(h2_io *io, ap
}
}
+ if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
+ if (io->eos_in) {
+ if (!io->eos_in_written) {
+ status = append_eos(io, bb, trailers);
+ io->eos_in_written = 1;
+ }
+ }
+ }
+
+ if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
+ return APR_EAGAIN;
+ }
return status;
}
@@ -298,7 +338,7 @@ apr_status_t h2_io_out_readx(h2_io *io,
return APR_ECONNABORTED;
}
- if (io->eos_out) {
+ if (io->eos_out_read) {
*plen = 0;
*peos = 1;
return APR_SUCCESS;
@@ -316,7 +356,7 @@ apr_status_t h2_io_out_readx(h2_io *io,
else {
status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
if (status == APR_SUCCESS) {
- io->eos_out = *peos;
+ io->eos_out_read = *peos;
}
}
@@ -330,7 +370,7 @@ apr_status_t h2_io_out_read_to(h2_io *io
return APR_ECONNABORTED;
}
- if (io->eos_out) {
+ if (io->eos_out_read) {
*plen = 0;
*peos = 1;
return APR_SUCCESS;
@@ -341,7 +381,7 @@ apr_status_t h2_io_out_read_to(h2_io *io
return APR_EAGAIN;
}
- io->eos_out = *peos = h2_util_has_eos(io->bbout, *plen);
+ io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
}
@@ -413,14 +453,17 @@ apr_status_t h2_io_out_close(h2_io *io,
if (io->rst_error) {
return APR_ECONNABORTED;
}
- if (!io->eos_out) { /* EOS has not been read yet */
+ if (!io->eos_out_read) { /* EOS has not been read yet */
process_trailers(io, trailers);
if (!io->bbout) {
io->bbout = apr_brigade_create(io->pool, io->bucket_alloc);
}
- if (!h2_util_has_eos(io->bbout, -1)) {
- APR_BRIGADE_INSERT_TAIL(io->bbout,
- apr_bucket_eos_create(io->bucket_alloc));
+ if (!io->eos_out) {
+ io->eos_out = 1;
+ if (!h2_util_has_eos(io->bbout, -1)) {
+ APR_BRIGADE_INSERT_TAIL(io->bbout,
+ apr_bucket_eos_create(io->bucket_alloc));
+ }
}
}
return APR_SUCCESS;
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io.h Wed Mar 2 11:21:28 2016
@@ -20,6 +20,7 @@ struct h2_response;
struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
+struct h2_task;
typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
@@ -30,8 +31,7 @@ typedef enum {
H2_IO_READ,
H2_IO_WRITE,
H2_IO_ANY,
-}
-h2_io_op;
+} h2_io_op;
typedef struct h2_io h2_io;
@@ -51,15 +51,19 @@ struct h2_io {
unsigned int orphaned : 1; /* h2_stream is gone for this io */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
unsigned int worker_done : 1; /* h2_worker finished for this io */
+ unsigned int submitted : 1; /* response has been submitted to client */
unsigned int request_body : 1; /* iff request has body */
unsigned int eos_in : 1; /* input eos has been seen */
unsigned int eos_in_written : 1; /* input eos has been forwarded */
- unsigned int eos_out : 1; /* output eos has been seen */
+ unsigned int eos_out : 1; /* output eos is present */
+ unsigned int eos_out_read : 1; /* output eos has been forwarded */
h2_io_op timed_op; /* which operation is waited on, if any */
struct apr_thread_cond_t *timed_cond; /* condition to wait on, maybe NULL */
apr_time_t timeout_at; /* when IO wait will time out */
+ apr_time_t started_at; /* when processing started */
+ apr_time_t done_at; /* when processing was done */
apr_size_t input_consumed; /* how many bytes have been read */
int files_handles_owned;
@@ -72,12 +76,7 @@ struct h2_io {
/**
* Creates a new h2_io for the given stream id.
*/
-h2_io *h2_io_create(int id, apr_pool_t *pool);
-
-/**
- * Frees any resources hold by the h2_io instance.
- */
-void h2_io_destroy(h2_io *io);
+h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request);
/**
* Set the response of this stream.
@@ -89,6 +88,9 @@ void h2_io_set_response(h2_io *io, struc
*/
void h2_io_rst(h2_io *io, int error);
+int h2_io_is_repeatable(h2_io *io);
+void h2_io_redo(h2_io *io);
+
/**
* The input data is completely queued. Blocked reads will return immediately
* and give either data or EOF.
@@ -98,9 +100,13 @@ int h2_io_in_has_eos_for(h2_io *io);
* Output data is available.
*/
int h2_io_out_has_data(h2_io *io);
+/**
+ * Input data is available.
+ */
+int h2_io_in_has_data(h2_io *io);
void h2_io_signal(h2_io *io, h2_io_op op);
-void h2_io_signal_init(h2_io *io, h2_io_op op, int timeout_secs,
+void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
struct apr_thread_cond_t *cond);
void h2_io_signal_exit(h2_io *io);
apr_status_t h2_io_signal_wait(struct h2_mplx *m, h2_io *io);
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.c Wed Mar 2 11:21:28 2016
@@ -45,16 +45,6 @@ h2_io_set *h2_io_set_create(apr_pool_t *
return sp;
}
-void h2_io_set_destroy(h2_io_set *sp)
-{
- int i;
- for (i = 0; i < sp->list->nelts; ++i) {
- h2_io *io = h2_io_IDX(sp->list, i);
- h2_io_destroy(io);
- }
- sp->list->nelts = 0;
-}
-
static int h2_stream_id_cmp(const void *s1, const void *s2)
{
h2_io **pio1 = (h2_io **)s1;
@@ -91,7 +81,7 @@ apr_status_t h2_io_set_add(h2_io_set *sp
int last;
APR_ARRAY_PUSH(sp->list, h2_io*) = io;
/* Normally, streams get added in ascending order if id. We
- * keep the array sorted, so we just need to check of the newly
+ * keep the array sorted, so we just need to check if the newly
* appended stream has a lower id than the last one. if not,
* sorting is not necessary.
*/
@@ -111,9 +101,7 @@ static void remove_idx(h2_io_set *sp, in
--sp->list->nelts;
n = sp->list->nelts - idx;
if (n > 0) {
- /* Close the hole in the array by moving the upper
- * parts down one step.
- */
+ /* There are n h2_io* behind idx. Move the rest down */
h2_io **selts = (h2_io**)sp->list->elts;
memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
}
@@ -124,7 +112,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h
int i;
for (i = 0; i < sp->list->nelts; ++i) {
h2_io *e = h2_io_IDX(sp->list, i);
- if (e == io) {
+ if (e->id == io->id) {
remove_idx(sp, i);
return e;
}
@@ -132,7 +120,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h
return NULL;
}
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set)
+h2_io *h2_io_set_shift(h2_io_set *set)
{
/* For now, this just removes the first element in the set.
* the name is misleading...
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_io_set.h Wed Mar 2 11:21:28 2016
@@ -26,8 +26,6 @@ typedef struct h2_io_set h2_io_set;
h2_io_set *h2_io_set_create(apr_pool_t *pool);
-void h2_io_set_destroy(h2_io_set *set);
-
apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
@@ -48,9 +46,8 @@ typedef int h2_io_set_iter_fn(void *ctx,
* @param ctx user data for the callback
* @return 1 iff iteration completed for all members
*/
-int h2_io_set_iter(h2_io_set *set,
- h2_io_set_iter_fn *iter, void *ctx);
+int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx);
-h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
+h2_io *h2_io_set_shift(h2_io_set *set);
#endif /* defined(__mod_h2__h2_io_set__) */
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.c Wed Mar 2 11:21:28 2016
@@ -17,7 +17,6 @@
#include <stddef.h>
#include <stdlib.h>
-#include <apr_atomic.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
@@ -27,21 +26,23 @@
#include <http_core.h>
#include <http_log.h>
+#include "mod_http2.h"
+
#include "h2_private.h"
#include "h2_config.h"
#include "h2_conn.h"
+#include "h2_ctx.h"
#include "h2_h2.h"
+#include "h2_int_queue.h"
#include "h2_io.h"
#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_request.h"
#include "h2_stream.h"
-#include "h2_stream_set.h"
#include "h2_task.h"
#include "h2_task_input.h"
#include "h2_task_output.h"
-#include "h2_task_queue.h"
#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
@@ -60,6 +61,48 @@
} while(0)
+/* NULL or the mutex hold by this thread, used for recursive calls
+ */
+static apr_threadkey_t *thread_lock;
+
+apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
+{
+ return apr_threadkey_private_create(&thread_lock, NULL, pool);
+}
+
+static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
+{
+ apr_status_t status;
+ void *mutex = NULL;
+
+ /* Enter the mutex if this thread already holds the lock or
+ * if we can acquire it. Only on the later case do we unlock
+ * onleaving the mutex.
+ * This allow recursive entering of the mutex from the saem thread,
+ * which is what we need in certain situations involving callbacks
+ */
+ apr_threadkey_private_get(&mutex, thread_lock);
+ if (mutex == m->lock) {
+ *pacquired = 0;
+ return APR_SUCCESS;
+ }
+
+ status = apr_thread_mutex_lock(m->lock);
+ *pacquired = (status == APR_SUCCESS);
+ if (*pacquired) {
+ apr_threadkey_private_set(m->lock, thread_lock);
+ }
+ return status;
+}
+
+static void leave_mutex(h2_mplx *m, int acquired)
+{
+ if (acquired) {
+ apr_threadkey_private_set(NULL, thread_lock);
+ apr_thread_mutex_unlock(m->lock);
+ }
+}
+
static int is_aborted(h2_mplx *m, apr_status_t *pstatus)
{
AP_DEBUG_ASSERT(m);
@@ -101,14 +144,6 @@ static void h2_mplx_destroy(h2_mplx *m)
"h2_mplx(%ld): destroy, ios=%d",
m->id, (int)h2_io_set_size(m->stream_ios));
m->aborted = 1;
- if (m->ready_ios) {
- h2_io_set_destroy(m->ready_ios);
- m->ready_ios = NULL;
- }
- if (m->stream_ios) {
- h2_io_set_destroy(m->stream_ios);
- m->stream_ios = NULL;
- }
check_tx_free(m);
@@ -129,7 +164,8 @@ static void h2_mplx_destroy(h2_mplx *m)
* than protecting a shared h2_session one with an own lock.
*/
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
- const h2_config *conf,
+ const h2_config *conf,
+ apr_interval_time_t stream_timeout,
h2_workers *workers)
{
apr_status_t status = APR_SUCCESS;
@@ -151,6 +187,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
if (!m->pool) {
return NULL;
}
+ apr_pool_tag(m->pool, "h2_mplx");
apr_allocator_owner_set(allocator, m->pool);
status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
@@ -160,16 +197,26 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
return NULL;
}
- m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+ status = apr_thread_cond_create(&m->task_done, m->pool);
+ if (status != APR_SUCCESS) {
+ h2_mplx_destroy(m);
+ return NULL;
+ }
+
+ m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
+ m->stream_timeout = stream_timeout;
m->workers = workers;
+ m->workers_max = h2_config_geti(conf, H2_CONF_MAX_WORKERS);
+ m->workers_def_limit = 4;
+ m->workers_limit = m->workers_def_limit;
+ m->last_limit_change = m->last_idle_block = apr_time_now();
+ m->limit_change_interval = apr_time_from_msec(200);
m->tx_handles_reserved = 0;
m->tx_chunk_size = 4;
-
- m->stream_timeout_secs = h2_config_geti(conf, H2_CONF_STREAM_TIMEOUT_SECS);
}
return m;
}
@@ -177,10 +224,11 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
int h2_mplx_get_max_stream_started(h2_mplx *m)
{
int stream_id = 0;
+ int acquired;
- apr_thread_mutex_lock(m->lock);
+ enter_mutex(m, &acquired);
stream_id = m->max_stream_started;
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
return stream_id;
}
@@ -198,6 +246,7 @@ static void workers_register(h2_mplx *m)
* Therefore: ref counting for h2_workers in not needed, ref counting
* for h2_worker using this is critical.
*/
+ m->need_registration = 0;
h2_workers_register(m->workers, m);
}
@@ -231,7 +280,9 @@ static void io_destroy(h2_mplx *m, h2_io
h2_io_set_remove(m->stream_ios, io);
h2_io_set_remove(m->ready_ios, io);
- h2_io_destroy(io);
+ if (m->redo_ios) {
+ h2_io_set_remove(m->redo_ios, io);
+ }
if (pool) {
apr_pool_clear(pool);
@@ -250,7 +301,7 @@ static int io_stream_done(h2_mplx *m, h2
h2_io_set_remove(m->ready_ios, io);
if (!io->worker_started || io->worker_done) {
/* already finished or not even started yet */
- h2_tq_remove(m->q, io->id);
+ h2_iq_remove(m->q, io->id);
io_destroy(m, io, 1);
return 0;
}
@@ -266,27 +317,64 @@ static int stream_done_iter(void *ctx, h
return io_stream_done((h2_mplx*)ctx, io, 0);
}
+static int stream_print(void *ctx, h2_io *io)
+{
+ h2_mplx *m = ctx;
+ if (io && io->request) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d"
+ "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ m->id, io->id,
+ io->request->method, io->request->authority, io->request->path,
+ io->response? "http" : (io->rst_error? "reset" : "?"),
+ io->response? io->response->http_status : io->rst_error,
+ io->orphaned, io->worker_started, io->worker_done,
+ io->eos_in, io->eos_out);
+ }
+ else if (io) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-%d): NULL -> %s %d"
+ "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]",
+ m->id, io->id,
+ io->response? "http" : (io->rst_error? "reset" : "?"),
+ io->response? io->response->http_status : io->rst_error,
+ io->orphaned, io->worker_started, io->worker_done,
+ io->eos_in, io->eos_out);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ "->03198: h2_stream(%ld-NULL): NULL", m->id);
+ }
+ return 1;
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
-
+ int acquired;
+
h2_workers_unregister(m->workers, m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int i, wait_secs = 5;
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
+ h2_iq_clear(m->q);
+ apr_thread_cond_broadcast(m->task_done);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
- /* Any remaining ios have handed out requests to workers that are
- * not done yet. Any operation they do on their assigned stream ios will
- * be errored ECONNRESET/ABORTED, so that should find out pretty soon.
+ /* If we still have busy workers, we cannot release our memory
+ * pool yet, as slave connections have child pools of their respective
+ * h2_io's.
+ * Any remaining ios are processed in these workers. Any operation
+ * they do on their input/outputs will be errored ECONNRESET/ABORTED,
+ * so processing them should fail and workers *should* return.
*/
- for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
+ for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join, waiting on %d worker to report back",
@@ -302,14 +390,20 @@ apr_status_t h2_mplx_release_and_join(h2
*/
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
"h2_mplx(%ld): release, waiting for %d seconds now for "
- "all h2_workers to return, have still %d requests outstanding",
- m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+ "%d h2_workers to return, have still %d requests outstanding",
+ m->id, i*wait_secs, m->workers_busy,
+ (int)h2_io_set_size(m->stream_ios));
+ if (i == 1) {
+ h2_io_set_iter(m->stream_ios, stream_print, m);
+ }
}
+ m->aborted = 1;
+ apr_thread_cond_broadcast(m->task_done);
}
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
h2_mplx_destroy(m);
/* all gone */
}
@@ -319,88 +413,43 @@ apr_status_t h2_mplx_release_and_join(h2
void h2_mplx_abort(h2_mplx *m)
{
apr_status_t status;
+ int acquired;
AP_DEBUG_ASSERT(m);
if (!m->aborted) {
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
m->aborted = 1;
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
}
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
{
- apr_status_t status;
+ apr_status_t status = APR_SUCCESS;
+ int acquired;
+ /* This maybe called from inside callbacks that already hold the lock.
+ * E.g. when we are streaming out DATA and the EOF triggers the stream
+ * release.
+ */
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
/* there should be an h2_io, once the stream has been scheduled
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): marking stream as done.",
+ m->id, stream_id);
io_stream_done(m, io, rst_error);
}
-
- apr_thread_mutex_unlock(m->lock);
- }
- return status;
-}
-static const h2_request *pop_request(h2_mplx *m)
-{
- const h2_request *req = NULL;
- int sid;
- while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
- h2_io *io = h2_io_set_get(m->stream_ios, sid);
- if (io) {
- req = io->request;
- io->worker_started = 1;
- if (sid > m->max_stream_started) {
- m->max_stream_started = sid;
- }
- }
- }
- return req;
-}
-
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
-{
- h2_mplx *m = *pm;
-
- apr_status_t status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): request(%d) done", m->id, stream_id);
- if (io) {
- io->worker_done = 1;
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
- }
- else {
- /* hang around until the stream deregisteres */
- }
- }
-
- if (preq) {
- /* someone wants another request, if we have */
- *preq = pop_request(m);
- }
- if (!preq || !*preq) {
- /* No request to hand back to the worker, NULLify reference
- * and decrement count */
- *pm = NULL;
- }
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
+ return status;
}
apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
@@ -409,14 +458,15 @@ apr_status_t h2_mplx_in_read(h2_mplx *m,
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre");
- h2_io_signal_init(io, H2_IO_READ, m->stream_timeout_secs, iowait);
+ h2_io_signal_init(io, H2_IO_READ, m->stream_timeout, iowait);
status = h2_io_in_read(io, bb, -1, trailers);
while (APR_STATUS_IS_EAGAIN(status)
&& !is_aborted(m, &status)
@@ -435,7 +485,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m,
else {
status = APR_EOF;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -444,9 +494,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
apr_bucket_brigade *bb)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
@@ -458,7 +509,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
else {
status = APR_ECONNABORTED;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -466,9 +517,10 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = h2_io_in_close(io);
@@ -479,7 +531,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
else {
status = APR_ECONNABORTED;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -507,12 +559,13 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
update_ctx ctx;
ctx.m = m;
@@ -524,7 +577,7 @@ apr_status_t h2_mplx_in_update_windows(h
if (ctx.streams_updated) {
status = APR_SUCCESS;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -535,9 +588,10 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
apr_table_t **ptrailers)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
@@ -553,7 +607,7 @@ apr_status_t h2_mplx_out_readx(h2_mplx *
}
*ptrailers = (*peos && io->response)? io->response->trailers : NULL;
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -564,9 +618,10 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
apr_table_t **ptrailers)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
@@ -582,23 +637,24 @@ apr_status_t h2_mplx_out_read_to(h2_mplx
status = APR_ECONNABORTED;
}
*ptrailers = (*peos && io->response)? io->response->trailers : NULL;
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
-h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
+h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
{
apr_status_t status;
h2_stream *stream = NULL;
+ int acquired;
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_io *io = h2_io_set_shift(m->ready_ios);
if (io && !m->aborted) {
- stream = h2_stream_set_get(streams, io->id);
+ stream = h2_ihash_get(streams, io->id);
if (stream) {
+ io->submitted = 1;
if (io->rst_error) {
h2_stream_rst(stream, io->rst_error);
}
@@ -614,7 +670,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
* reset by the client. Should no longer happen since such
* streams should clear io's from the ready queue.
*/
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347)
"h2_mplx(%ld): stream for response %d closed, "
"resetting io to close request processing",
m->id, io->id);
@@ -633,7 +689,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
h2_io_signal(io, H2_IO_WRITE);
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return stream;
}
@@ -657,7 +713,7 @@ static apr_status_t out_write(h2_mplx *m
&m->tx_handles_reserved);
/* Wait for data to drain until there is room again or
* stream timeout expires */
- h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout_secs, iowait);
+ h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
while (status == APR_SUCCESS
&& !APR_BRIGADE_EMPTY(bb)
&& iowait
@@ -716,9 +772,10 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
}
@@ -728,7 +785,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m
h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
}
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -739,13 +796,14 @@ apr_status_t h2_mplx_out_write(h2_mplx *
struct apr_thread_cond_t *iowait)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = out_write(m, io, f, bb, trailers, iowait);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%ld-%d): write with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
@@ -755,7 +813,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *
else {
status = APR_ECONNABORTED;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -763,9 +821,10 @@ apr_status_t h2_mplx_out_write(h2_mplx *
apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
if (!io->response && !io->rst_error) {
@@ -791,7 +850,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *
else {
status = APR_ECONNABORTED;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -799,9 +858,10 @@ apr_status_t h2_mplx_out_close(h2_mplx *
apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->rst_error && !io->orphaned) {
h2_io_rst(io, error);
@@ -816,7 +876,7 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
else {
status = APR_ECONNABORTED;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -824,10 +884,11 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m,
int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
{
int has_eos = 0;
+ int acquired;
+
apr_status_t status;
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
has_eos = h2_io_in_has_eos_for(io);
@@ -835,18 +896,39 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, i
else {
has_eos = 1;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return has_eos;
}
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
+{
+ apr_status_t status;
+ int has_data = 0;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io && !io->orphaned) {
+ has_data = h2_io_in_has_data(io);
+ }
+ else {
+ has_data = 0;
+ }
+ leave_mutex(m, acquired);
+ }
+ return has_data;
+}
+
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
{
apr_status_t status;
int has_data = 0;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
has_data = h2_io_out_has_data(io);
@@ -854,7 +936,7 @@ int h2_mplx_out_has_data_for(h2_mplx *m,
else {
has_data = 0;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return has_data;
}
@@ -863,9 +945,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
apr_thread_cond_t *iowait)
{
apr_status_t status;
+ int acquired;
+
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
}
@@ -879,7 +962,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
}
m->added_output = NULL;
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
@@ -896,37 +979,38 @@ static void have_out_data_for(h2_mplx *m
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
+ int acquired;
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
}
else {
- h2_tq_sort(m->q, cmp, ctx);
+ h2_iq_sort(m->q, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): reprioritize tasks", m->id);
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
return status;
}
-static h2_io *open_io(h2_mplx *m, int stream_id)
+static h2_io *open_io(h2_mplx *m, int stream_id, const h2_request *request)
{
apr_pool_t *io_pool = m->spare_pool;
h2_io *io;
if (!io_pool) {
apr_pool_create(&io_pool, m->pool);
+ apr_pool_tag(io_pool, "h2_io");
}
else {
m->spare_pool = NULL;
}
- io = h2_io_create(stream_id, io_pool);
+ io = h2_io_create(stream_id, io_pool, request);
h2_io_set_add(m->stream_ios, io);
return io;
@@ -937,55 +1021,615 @@ apr_status_t h2_mplx_process(h2_mplx *m,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int was_empty = 0;
+ int do_registration = 0;
+ int acquired;
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
}
else {
- h2_io *io = open_io(m, stream_id);
- io->request = req;
+ h2_io *io = open_io(m, stream_id, req);
if (!io->request->body) {
status = h2_io_in_close(io);
}
- was_empty = h2_tq_empty(m->q);
- h2_tq_add(m->q, io->id, cmp, ctx);
+ m->need_registration = m->need_registration || h2_iq_empty(m->q);
+ do_registration = (m->need_registration && m->workers_busy < m->workers_max);
+ h2_iq_add(m->q, io->id, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%ld-%d): process", m->c->id, stream_id);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process");
}
- apr_thread_mutex_unlock(m->lock);
+ leave_mutex(m, acquired);
}
- if (status == APR_SUCCESS && was_empty) {
+ if (status == APR_SUCCESS && do_registration) {
workers_register(m);
}
return status;
}
-const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
+static h2_task *pop_task(h2_mplx *m)
{
- const h2_request *req = NULL;
+ h2_task *task = NULL;
+ int sid;
+ while (!m->aborted && !task
+ && (m->workers_busy < m->workers_limit)
+ && (sid = h2_iq_shift(m->q)) > 0) {
+ h2_io *io = h2_io_set_get(m->stream_ios, sid);
+ if (io && io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else if (io) {
+ conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
+ m->spare_allocator = NULL;
+ task = h2_task_create(m->id, io->request, slave, m);
+ io->worker_started = 1;
+ io->started_at = apr_time_now();
+ if (sid > m->max_stream_started) {
+ m->max_stream_started = sid;
+ }
+ ++m->workers_busy;
+ }
+ }
+ return task;
+}
+
+h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
+{
+ h2_task *task = NULL;
apr_status_t status;
+ int acquired;
AP_DEBUG_ASSERT(m);
- status = apr_thread_mutex_lock(m->lock);
- if (APR_SUCCESS == status) {
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
- req = NULL;
*has_more = 0;
}
else {
- req = pop_request(m);
- *has_more = !h2_tq_empty(m->q);
+ task = pop_task(m);
+ *has_more = !h2_iq_empty(m->q);
+ }
+
+ if (has_more && !task) {
+ m->need_registration = 1;
+ }
+ leave_mutex(m, acquired);
+ }
+ return task;
+}
+
+static void task_done(h2_mplx *m, h2_task *task)
+{
+ if (task) {
+ if (task->frozen) {
+ /* this task was handed over to an engine for processing */
+ h2_task_thaw(task);
+ /* TODO: can we signal an engine that it can now start on this? */
+ }
+ else {
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): task(%s) done", m->id, task->id);
+ /* clean our references and report request as done. Signal
+ * that we want another unless we have been aborted */
+ /* TODO: this will keep a worker attached to this h2_mplx as
+ * long as it has requests to handle. Might no be fair to
+ * other mplx's. Perhaps leave after n requests? */
+ h2_mplx_out_close(m, task->stream_id, NULL);
+ if (m->spare_allocator) {
+ apr_allocator_destroy(m->spare_allocator);
+ m->spare_allocator = NULL;
+ }
+ h2_slave_destroy(task->c, &m->spare_allocator);
+ task = NULL;
+ if (io) {
+ apr_time_t now = apr_time_now();
+ if (!io->orphaned && m->redo_ios
+ && h2_io_set_get(m->redo_ios, io->id)) {
+ /* reset and schedule again */
+ h2_io_redo(io);
+ h2_io_set_remove(m->redo_ios, io);
+ h2_iq_add(m->q, io->id, NULL, NULL);
+ }
+ else {
+ io->worker_done = 1;
+ io->done_at = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): request(%d) done, %f ms"
+ " elapsed", m->id, io->id,
+ (io->done_at - io->started_at) / 1000.0);
+ if (io->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (now - m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = now;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
+ }
+ }
+ }
+
+ if (io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else {
+ /* hang around until the stream deregisteres */
+ }
+ }
+ apr_thread_cond_broadcast(m->task_done);
+ }
+ }
+}
+
+void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
+{
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ task_done(m, task);
+ --m->workers_busy;
+ if (ptask) {
+ /* caller wants another task */
+ *ptask = pop_task(m);
+ }
+ leave_mutex(m, acquired);
+ }
+}
+
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+typedef struct {
+ h2_mplx *m;
+ h2_io *io;
+ apr_time_t now;
+} io_iter_ctx;
+
+static int latest_repeatable_busy_unsubmitted_iter(void *data, h2_io *io)
+{
+ io_iter_ctx *ctx = data;
+ if (io->worker_started && !io->worker_done
+ && h2_io_is_repeatable(io)
+ && !h2_io_set_get(ctx->m->redo_ios, io->id)) {
+ /* this io occupies a worker, the response has not been submitted yet,
+ * not been cancelled and it is a repeatable request
+ * -> it can be re-scheduled later */
+ if (!ctx->io || ctx->io->started_at < io->started_at) {
+ /* we did not have one or this one was started later */
+ ctx->io = io;
+ }
+ }
+ return 1;
+}
+
+static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m)
+{
+ io_iter_ctx ctx;
+ ctx.m = m;
+ ctx.io = NULL;
+ h2_io_set_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx);
+ return ctx.io;
+}
+
+static int timed_out_busy_iter(void *data, h2_io *io)
+{
+ io_iter_ctx *ctx = data;
+ if (io->worker_started && !io->worker_done
+ && (ctx->now - io->started_at) > ctx->m->stream_timeout) {
+ /* timed out stream occupying a worker, found */
+ ctx->io = io;
+ return 0;
+ }
+ return 1;
+}
+static h2_io *get_timed_out_busy_stream(h2_mplx *m)
+{
+ io_iter_ctx ctx;
+ ctx.m = m;
+ ctx.io = NULL;
+ ctx.now = apr_time_now();
+ h2_io_set_iter(m->stream_ios, timed_out_busy_iter, &ctx);
+ return ctx.io;
+}
+
+static apr_status_t unschedule_slow_ios(h2_mplx *m)
+{
+ h2_io *io;
+ int n;
+
+ if (!m->redo_ios) {
+ m->redo_ios = h2_io_set_create(m->pool);
+ }
+ /* Try to get rid of streams that occupy workers. Look for safe requests
+ * that are repeatable. If none found, fail the connection.
+ */
+ n = (m->workers_busy - m->workers_limit - h2_io_set_size(m->redo_ios));
+ while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) {
+ h2_io_set_add(m->redo_ios, io);
+ h2_io_rst(io, H2_ERR_CANCEL);
+ --n;
+ }
+
+ if ((m->workers_busy - h2_io_set_size(m->redo_ios)) > m->workers_limit) {
+ io = get_timed_out_busy_stream(m);
+ if (io) {
+ /* Too many busy workers, unable to cancel enough streams
+ * and with a busy, timed out stream, we tell the client
+ * to go away... */
+ return APR_TIMEUP;
+ }
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_mplx_idle(h2_mplx *m)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_time_t now;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ apr_size_t scount = h2_io_set_size(m->stream_ios);
+ if (scount > 0 && m->workers_busy) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->workers_limit > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->workers_limit > 16) {
+ m->workers_limit = 16;
+ }
+ else if (m->workers_limit > 8) {
+ m->workers_limit = 8;
+ }
+ else if (m->workers_limit > 4) {
+ m->workers_limit = 4;
+ }
+ else if (m->workers_limit > 2) {
+ m->workers_limit = 2;
+ }
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->workers_limit);
+ }
+
+ if (m->workers_busy > m->workers_limit) {
+ status = unschedule_slow_ios(m);
+ }
+ }
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+
+typedef struct h2_req_entry h2_req_entry;
+struct h2_req_entry {
+ APR_RING_ENTRY(h2_req_entry) link;
+ request_rec *r;
+};
+
+#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
+#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link)
+#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
+
+typedef struct h2_req_engine_i h2_req_engine_i;
+struct h2_req_engine_i {
+ h2_req_engine pub;
+ conn_rec *c; /* connection this engine is assigned to */
+ h2_mplx *m;
+ unsigned int shutdown : 1; /* engine is being shut down */
+ apr_thread_cond_t *io; /* condition var for waiting on data */
+ APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
+ apr_size_t no_assigned; /* # of assigned requests */
+ apr_size_t no_live; /* # of live */
+ apr_size_t no_finished; /* # of finished */
+};
+
+#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
+h2_req_entry *ap__b = (e); \
+APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
+h2_req_entry *ap__b = (e); \
+APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \
+} while (0)
+
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m,
+ h2_req_engine_i *engine,
+ request_rec *r)
+{
+ h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
+
+ APR_RING_ELEM_INIT(entry, link);
+ entry->r = r;
+ H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
+ return APR_SUCCESS;
+}
+
+
+apr_status_t h2_mplx_engine_push(const char *engine_type,
+ request_rec *r, h2_mplx_engine_init *einit)
+{
+ apr_status_t status;
+ h2_mplx *m;
+ h2_task *task;
+ int acquired;
+
+ task = h2_ctx_rget_task(r);
+ if (!task) {
+ return APR_ECONNABORTED;
+ }
+ m = task->mplx;
+ AP_DEBUG_ASSERT(m);
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ if (!io || io->orphaned) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
+
+ apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
+ status = APR_EOF;
+
+ if (task->ser_headers) {
+ /* Max compatibility, deny processing of this */
+ }
+ else if (engine && !strcmp(engine->pub.type, engine_type)) {
+ if (engine->shutdown
+ || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): engine shutdown or over %s",
+ m->c->id, engine->pub.id);
+ engine = NULL;
+ }
+ else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
+ /* this task will be processed in another thread,
+ * freeze any I/O for the time being. */
+ h2_task_freeze(task, r);
+ engine->no_assigned++;
+ status = APR_SUCCESS;
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
+ "h2_mplx(%ld): push request %s",
+ m->c->id, r->the_request);
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): engine error adding req %s",
+ m->c->id, engine->pub.id);
+ engine = NULL;
+ }
+ }
+
+ if (!engine && einit) {
+ engine = apr_pcalloc(task->c->pool, sizeof(*engine));
+ engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d",
+ m->id, m->next_eng_id++);
+ engine->pub.pool = task->c->pool;
+ engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
+ engine->pub.window_bits = 30;
+ engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
+ engine->c = r->connection;
+ APR_RING_INIT(&engine->entries, h2_req_entry, link);
+ engine->m = m;
+ engine->io = task->io;
+ engine->no_assigned = 1;
+ engine->no_live = 1;
+
+ status = einit(&engine->pub, r);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): init engine %s (%s)",
+ m->c->id, engine->pub.id, engine->pub.type);
+ if (status == APR_SUCCESS) {
+ m->engine = &engine->pub;
+ }
+ }
+ }
+
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
+{
+ h2_req_entry *entry;
+ h2_task *task;
+
+ for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+ entry = H2_REQ_ENTRY_NEXT(entry)) {
+ task = h2_ctx_rget_task(entry->r);
+ AP_DEBUG_ASSERT(task);
+ if (!task->frozen) {
+ H2_REQ_ENTRY_REMOVE(entry);
+ return entry;
}
- apr_thread_mutex_unlock(m->lock);
}
- return req;
+ return NULL;
}
+static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
+ apr_read_type_e block, request_rec **pr)
+{
+ h2_req_entry *entry;
+
+ AP_DEBUG_ASSERT(m);
+ AP_DEBUG_ASSERT(engine);
+ while (1) {
+ if (m->aborted) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): mplx abort while pulling requests %s",
+ m->id, engine->pub.id);
+ *pr = NULL;
+ return APR_EOF;
+ }
+
+ if (!H2_REQ_ENTRIES_EMPTY(&engine->entries)
+ && (entry = pop_non_frozen(engine))) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
+ "h2_mplx(%ld): request %s pulled by engine %s",
+ m->c->id, entry->r->the_request, engine->pub.id);
+ engine->no_live++;
+ entry->r->connection->current_thread = engine->c->current_thread;
+ *pr = entry->r;
+ return APR_SUCCESS;
+ }
+ else if (APR_NONBLOCK_READ == block) {
+ *pr = NULL;
+ return APR_EAGAIN;
+ }
+ else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+ engine->shutdown = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): emtpy queue, shutdown engine %s",
+ m->id, engine->pub.id);
+ *pr = NULL;
+ return APR_EOF;
+ }
+ apr_thread_cond_timedwait(m->task_done, m->lock,
+ apr_time_from_msec(100));
+ }
+}
+
+apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine,
+ apr_read_type_e block, request_rec **pr)
+{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
+ apr_status_t status;
+ int acquired;
+
+ *pr = NULL;
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ status = engine_pull(m, engine, block, pr);
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task,
+ int waslive, int aborted)
+{
+ int acquired;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): task %s %s by %s",
+ m->id, task->id, aborted? "aborted":"done",
+ engine->pub.id);
+ h2_task_output_close(task->output);
+ engine->no_finished++;
+ if (waslive) engine->no_live--;
+ engine->no_assigned--;
+ if (task->c != engine->c) { /* do not release what the engine runs on */
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ task_done(m, task);
+ leave_mutex(m, acquired);
+ }
+ }
+}
+
+void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
+{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
+ h2_task *task;
+ int acquired;
+
+ task = h2_ctx_cget_task(r_conn);
+ if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
+ engine_done(m, engine, task, 1, 0);
+ leave_mutex(m, acquired);
+ }
+}
+
+void h2_mplx_engine_exit(h2_req_engine *pub_engine)
+{
+ h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+ h2_mplx *m = engine->m;
+ int acquired;
+
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ if (!m->aborted
+ && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+ h2_req_entry *entry;
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s), "
+ "has still requests queued, shutdown=%d,"
+ "assigned=%ld, live=%ld, finished=%ld",
+ m->c->id, engine->pub.id, engine->pub.type,
+ engine->shutdown,
+ (long)engine->no_assigned, (long)engine->no_live,
+ (long)engine->no_finished);
+ for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+ entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+ entry = H2_REQ_ENTRY_NEXT(entry)) {
+ request_rec *r = entry->r;
+ h2_task *task = h2_ctx_rget_task(r);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): engine %s has queued task %s, "
+ "frozen=%d, aborting",
+ m->c->id, engine->pub.id, task->id, task->frozen);
+ engine_done(m, engine, task, 0, 1);
+ }
+ }
+ if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s), "
+ "assigned=%ld, live=%ld, finished=%ld",
+ m->c->id, engine->pub.id, engine->pub.type,
+ (long)engine->no_assigned, (long)engine->no_live,
+ (long)engine->no_finished);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): exit engine %s (%s)",
+ m->c->id, engine->pub.id, engine->pub.type);
+ }
+ if (m->engine == &engine->pub) {
+ m->engine = NULL; /* TODO */
+ }
+ leave_mutex(m, acquired);
+ }
+}
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_mplx.h Wed Mar 2 11:21:28 2016
@@ -38,6 +38,7 @@ struct apr_pool_t;
struct apr_thread_mutex_t;
struct apr_thread_cond_t;
struct h2_config;
+struct h2_ihash_t;
struct h2_response;
struct h2_task;
struct h2_stream;
@@ -45,9 +46,10 @@ struct h2_request;
struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
-struct h2_stream_set;
-struct h2_task_queue;
+struct h2_int_queue;
+struct h2_req_engine;
+#include <apr_queue.h>
#include "h2_io.h"
typedef struct h2_mplx h2_mplx;
@@ -66,27 +68,45 @@ struct h2_mplx {
apr_pool_t *pool;
unsigned int aborted : 1;
+ unsigned int need_registration : 1;
- struct h2_task_queue *q;
+ struct h2_int_queue *q;
struct h2_io_set *stream_ios;
struct h2_io_set *ready_ios;
+ struct h2_io_set *redo_ios;
int max_stream_started; /* highest stream id that started processing */
+ int workers_busy; /* # of workers processing on this mplx */
+ int workers_limit; /* current # of workers limit, dynamic */
+ int workers_def_limit; /* default # of workers limit */
+ int workers_max; /* max, hard limit # of workers in a process */
+ apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
+ * streams were ready */
+ apr_time_t last_limit_change;/* last time, worker limit changed */
+ apr_interval_time_t limit_change_interval;
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
+ struct apr_thread_cond_t *task_done;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
- int stream_timeout_secs;
+ apr_interval_time_t stream_timeout;
apr_pool_t *spare_pool; /* spare pool, ready for next io */
+ apr_allocator_t *spare_allocator;
+
struct h2_workers *workers;
apr_size_t tx_handles_reserved;
apr_size_t tx_chunk_size;
h2_mplx_consumed_cb *input_consumed;
void *input_consumed_ctx;
+
+ struct h2_req_engine *engine;
+ /* TODO: signal for waiting tasks*/
+ apr_queue_t *engine_queue;
+ int next_eng_id;
};
@@ -95,12 +115,15 @@ struct h2_mplx {
* Object lifecycle and information.
******************************************************************************/
+apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s);
+
/**
* Create the multiplexer for the given HTTP2 session.
* Implicitly has reference count 1.
*/
h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master,
const struct h2_config *conf,
+ apr_interval_time_t stream_timeout,
struct h2_workers *workers);
/**
@@ -119,7 +142,9 @@ apr_status_t h2_mplx_release_and_join(h2
*/
void h2_mplx_abort(h2_mplx *mplx);
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
+
+void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask);
/**
* Get the highest stream identifier that has been passed on to processing.
@@ -143,10 +168,14 @@ int h2_mplx_get_max_stream_started(h2_mp
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
-/* Return != 0 iff the multiplexer has data for the given stream.
+/* Return != 0 iff the multiplexer has output data for the given stream.
*/
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
+/* Return != 0 iff the multiplexer has input data for the given stream.
+ */
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
+
/**
* Waits on output data from any stream in this session to become available.
* Returns APR_TIMEUP if no data arrived in the given time.
@@ -179,8 +208,6 @@ apr_status_t h2_mplx_process(h2_mplx *m,
*/
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
-const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
-
/**
* Register a callback for the amount of input data consumed per stream. The
* will only ever be invoked from the thread creating this h2_mplx, e.g. when
@@ -248,7 +275,7 @@ apr_status_t h2_mplx_in_update_windows(h
* @param bb the brigade to place any existing repsonse body data into
*/
struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
- struct h2_stream_set *streams);
+ struct h2_ihash_t *streams);
/**
* Reads output data from the given stream. Will never block, but
@@ -369,5 +396,32 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx
*/
#define H2_MPLX_REMOVE(e) APR_RING_REMOVE((e), link)
+/*******************************************************************************
+ * h2_mplx DoS protection
+ ******************************************************************************/
+
+/**
+ * Master connection has entered idle mode.
+ * @param m the mplx instance of the master connection
+ * @return != SUCCESS iff connection should be terminated
+ */
+apr_status_t h2_mplx_idle(h2_mplx *m);
+
+/*******************************************************************************
+ * h2_mplx h2_req_engine handling.
+ ******************************************************************************/
+
+typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine,
+ request_rec *r);
+
+apr_status_t h2_mplx_engine_push(const char *engine_type,
+ request_rec *r, h2_mplx_engine_init *einit);
+
+apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine,
+ apr_read_type_e block, request_rec **pr);
+
+void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
+
+void h2_mplx_engine_exit(struct h2_req_engine *engine);
#endif /* defined(__mod_h2__h2_mplx__) */
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_private.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_private.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_private.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_private.h Wed Mar 2 11:21:28 2016
@@ -16,26 +16,12 @@
#ifndef mod_h2_h2_private_h
#define mod_h2_h2_private_h
+#include <apr_time.h>
+
#include <nghttp2/nghttp2.h>
extern module AP_MODULE_DECLARE_DATA http2_module;
APLOG_USE_MODULE(http2);
-
-#define H2_HEADER_METHOD ":method"
-#define H2_HEADER_METHOD_LEN 7
-#define H2_HEADER_SCHEME ":scheme"
-#define H2_HEADER_SCHEME_LEN 7
-#define H2_HEADER_AUTH ":authority"
-#define H2_HEADER_AUTH_LEN 10
-#define H2_HEADER_PATH ":path"
-#define H2_HEADER_PATH_LEN 5
-#define H2_CRLF "\r\n"
-
-#define H2_ALEN(a) (sizeof(a)/sizeof((a)[0]))
-
-#define H2MAX(x,y) ((x) > (y) ? (x) : (y))
-#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
-
#endif
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_push.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.c Wed Mar 2 11:21:28 2016
@@ -276,20 +276,49 @@ static int same_authority(const h2_reque
return 1;
}
-static int set_header(void *ctx, const char *key, const char *value)
+static int set_push_header(void *ctx, const char *key, const char *value)
{
- apr_table_setn(ctx, key, value);
+ size_t klen = strlen(key);
+ if (H2_HD_MATCH_LIT("User-Agent", key, klen)
+ || H2_HD_MATCH_LIT("Accept", key, klen)
+ || H2_HD_MATCH_LIT("Accept-Encoding", key, klen)
+ || H2_HD_MATCH_LIT("Accept-Language", key, klen)
+ || H2_HD_MATCH_LIT("Cache-Control", key, klen)) {
+ apr_table_setn(ctx, key, value);
+ }
return 1;
}
+static int has_param(link_ctx *ctx, const char *param)
+{
+ const char *p = apr_table_get(ctx->params, param);
+ return !!p;
+}
+
+static int has_relation(link_ctx *ctx, const char *rel)
+{
+ const char *s, *val = apr_table_get(ctx->params, "rel");
+ if (val) {
+ if (!strcmp(rel, val)) {
+ return 1;
+ }
+ s = ap_strstr_c(val, rel);
+ if (s && (s == val || s[-1] == ' ')) {
+ s += strlen(rel);
+ if (!*s || *s == ' ') {
+ return 1;
+ }
+ }
+ }
+ return 0;
+}
static int add_push(link_ctx *ctx)
{
/* so, we have read a Link header and need to decide
* if we transform it into a push.
*/
- const char *rel = apr_table_get(ctx->params, "rel");
- if (rel && !strcmp("preload", rel)) {
+ if (has_relation(ctx, "preload") && !has_param(ctx, "nopush")) {
apr_uri_t uri;
if (apr_uri_parse(ctx->pool, ctx->link, &uri) == APR_SUCCESS) {
if (uri.path && same_authority(ctx->req, &uri)) {
@@ -306,9 +335,7 @@ static int add_push(link_ctx *ctx)
* TLS (if any) parameters.
*/
path = apr_uri_unparse(ctx->pool, &uri, APR_URI_UNP_OMITSITEPART);
-
push = apr_pcalloc(ctx->pool, sizeof(*push));
-
switch (ctx->req->push_policy) {
case H2_PUSH_HEAD:
method = "HEAD";
@@ -318,15 +345,10 @@ static int add_push(link_ctx *ctx)
break;
}
headers = apr_table_make(ctx->pool, 5);
- apr_table_do(set_header, headers, ctx->req->headers,
- "User-Agent",
- "Cache-Control",
- "Accept-Language",
- NULL);
- req = h2_request_createn(0, ctx->pool, ctx->req->config,
- method, ctx->req->scheme,
- ctx->req->authority,
- path, headers);
+ apr_table_do(set_push_header, headers, ctx->req->headers, NULL);
+ req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme,
+ ctx->req->authority, path, headers,
+ ctx->req->serialize);
/* atm, we do not push on pushes */
h2_request_end_headers(req, ctx->pool, 1, 0);
push->req = req;
@@ -434,38 +456,30 @@ apr_array_header_t *h2_push_collect(apr_
return NULL;
}
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled)
-{
- h2_push_policy policy = H2_PUSH_NONE;
- if (push_enabled) {
- const char *val = apr_table_get(req->headers, "accept-push-policy");
- if (val) {
- if (ap_find_token(p, val, "fast-load")) {
- policy = H2_PUSH_FAST_LOAD;
- }
- else if (ap_find_token(p, val, "head")) {
- policy = H2_PUSH_HEAD;
- }
- else if (ap_find_token(p, val, "default")) {
- policy = H2_PUSH_DEFAULT;
- }
- else if (ap_find_token(p, val, "none")) {
- policy = H2_PUSH_NONE;
- }
- else {
- /* nothing known found in this header, go by default */
- policy = H2_PUSH_DEFAULT;
- }
- }
- else {
- policy = H2_PUSH_DEFAULT;
- }
- }
- req->push_policy = policy;
-}
-
/*******************************************************************************
* push diary
+ *
+ * - The push diary keeps track of resources already PUSHed via HTTP/2 on this
+ * connection. It records a hash value from the absolute URL of the resource
+ * pushed.
+ * - Lacking openssl, it uses 'apr_hashfunc_default' for the value
+ * - with openssl, it uses SHA256 to calculate the hash value
+ * - whatever the method to generate the hash, the diary keeps a maximum of 64
+ * bits per hash, limiting the memory consumption to about
+ * H2PushDiarySize * 8
+ * bytes. Entries are sorted by most recently used and oldest entries are
+ * forgotten first.
+ * - Clients can initialize/replace the push diary by sending a 'Cache-Digest'
+ * header. Currently, this is the base64url encoded value of the cache digest
+ * as specified in https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/
+ * This draft can be expected to evolve and the definition of the header
+ * will be added there and refined.
+ * - The cache digest header is a Golomb Coded Set of hash values, but it may
+ * limit the amount of bits per hash value even further. For a good description
+ * of GCS, read here:
+ * http://giovanni.bajo.it/post/47119962313/golomb-coded-sets-smaller-than-bloom-filters
+ * - The means that the push diary might be initialized with hash values of much
+ * less than 64 bits, leading to more false positives, but smaller digest size.
******************************************************************************/
@@ -688,36 +702,6 @@ apr_array_header_t *h2_push_collect_upda
return h2_push_diary_update(stream->session, pushes);
}
-/* h2_log2(n) iff n is a power of 2 */
-static unsigned char h2_log2(apr_uint32_t n)
-{
- int lz = 0;
- if (!n) {
- return 0;
- }
- if (!(n & 0xffff0000u)) {
- lz += 16;
- n = (n << 16);
- }
- if (!(n & 0xff000000u)) {
- lz += 8;
- n = (n << 8);
- }
- if (!(n & 0xf0000000u)) {
- lz += 4;
- n = (n << 4);
- }
- if (!(n & 0xc0000000u)) {
- lz += 2;
- n = (n << 2);
- }
- if (!(n & 0x80000000u)) {
- lz += 1;
- }
-
- return 31 - lz;
-}
-
static apr_int32_t h2_log2inv(unsigned char log2)
{
return log2? (1 << log2) : 1;
@@ -794,8 +778,8 @@ static apr_status_t gset_encode_next(gse
/* Intentional no APLOGNO */
ap_log_perror(APLOG_MARK, GCSLOG_LEVEL, 0, encoder->pool,
"h2_push_diary_enc: val=%"APR_UINT64_T_HEX_FMT", delta=%"
- APR_UINT64_T_HEX_FMT" flex_bits=%ld, "
- "fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT,
+ APR_UINT64_T_HEX_FMT" flex_bits=%"APR_UINT64_T_FMT", "
+ ", fixed_bits=%d, fixed_val=%"APR_UINT64_T_HEX_FMT,
pval, delta, flex_bits, encoder->fixed_bits, delta&encoder->fixed_mask);
for (; flex_bits != 0; --flex_bits) {
status = gset_encode_bit(encoder, 1);
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_push.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_push.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_push.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_push.h Wed Mar 2 11:21:28 2016
@@ -15,19 +15,14 @@
#ifndef __mod_h2__h2_push__
#define __mod_h2__h2_push__
+#include "h2.h"
+
struct h2_request;
struct h2_response;
struct h2_ngheader;
struct h2_session;
struct h2_stream;
-typedef enum {
- H2_PUSH_NONE,
- H2_PUSH_DEFAULT,
- H2_PUSH_HEAD,
- H2_PUSH_FAST_LOAD,
-} h2_push_policy;
-
typedef struct h2_push {
const struct h2_request *req;
} h2_push;
@@ -66,17 +61,6 @@ apr_array_header_t *h2_push_collect(apr_
const struct h2_response *res);
/**
- * Set the push policy for the given request. Takes request headers into
- * account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00
- * for details.
- *
- * @param req the request to determine the policy for
- * @param p the pool to use
- * @param push_enabled if HTTP/2 server push is generally enabled for this request
- */
-void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled);
-
-/**
* Create a new push diary for the given maximum number of entries.
*
* @oaram p the pool to use
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.c?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.c Wed Mar 2 11:21:28 2016
@@ -30,45 +30,36 @@
#include <scoreboard.h>
#include "h2_private.h"
-#include "h2_config.h"
-#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_request.h"
-#include "h2_task.h"
#include "h2_util.h"
-h2_request *h2_request_create(int id, apr_pool_t *pool,
- const struct h2_config *config)
+h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize)
{
- return h2_request_createn(id, pool, config,
- NULL, NULL, NULL, NULL, NULL);
+ return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL,
+ serialize);
}
h2_request *h2_request_createn(int id, apr_pool_t *pool,
- const struct h2_config *config,
const char *method, const char *scheme,
const char *authority, const char *path,
- apr_table_t *header)
+ apr_table_t *header, int serialize)
{
h2_request *req = apr_pcalloc(pool, sizeof(h2_request));
req->id = id;
- req->config = config;
req->method = method;
req->scheme = scheme;
req->authority = authority;
req->path = path;
req->headers = header? header : apr_table_make(pool, 10);
req->request_time = apr_time_now();
-
+ req->serialize = serialize;
+
return req;
}
-void h2_request_destroy(h2_request *req)
-{
-}
-
static apr_status_t inspect_clen(h2_request *req, const char *s)
{
char *end;
@@ -139,38 +130,48 @@ static apr_status_t add_all_h1_header(h2
}
+apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
+ const char *method, const char *scheme,
+ const char *authority, const char *path,
+ apr_table_t *headers)
+{
+ req->method = method;
+ req->scheme = scheme;
+ req->authority = authority;
+ req->path = path;
+
+ AP_DEBUG_ASSERT(req->scheme);
+ AP_DEBUG_ASSERT(req->authority);
+ AP_DEBUG_ASSERT(req->path);
+ AP_DEBUG_ASSERT(req->method);
+
+ return add_all_h1_header(req, pool, headers);
+}
+
apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
{
apr_status_t status;
+ const char *scheme, *authority;
- req->config = h2_config_rget(r);
- req->method = r->method;
- req->scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
- : ap_http_scheme(r));
- req->authority = r->hostname;
- req->path = apr_uri_unparse(r->pool, &r->parsed_uri,
- APR_URI_UNP_OMITSITEPART);
-
- if (!ap_strchr_c(req->authority, ':') && r->server && r->server->port) {
- apr_port_t defport = apr_uri_port_of_scheme(req->scheme);
+ scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+ : ap_http_scheme(r));
+ authority = r->hostname;
+ if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
+ apr_port_t defport = apr_uri_port_of_scheme(scheme);
if (defport != r->server->port) {
/* port info missing and port is not default for scheme: append */
- req->authority = apr_psprintf(r->pool, "%s:%d", req->authority,
- (int)r->server->port);
+ authority = apr_psprintf(r->pool, "%s:%d", authority,
+ (int)r->server->port);
}
}
- AP_DEBUG_ASSERT(req->scheme);
- AP_DEBUG_ASSERT(req->authority);
- AP_DEBUG_ASSERT(req->path);
- AP_DEBUG_ASSERT(req->method);
-
- status = add_all_h1_header(req, r->pool, r->headers_in);
-
+ status = h2_request_make(req, r->pool, r->method, scheme, authority,
+ apr_uri_unparse(r->pool, &r->parsed_uri,
+ APR_URI_UNP_OMITSITEPART),
+ r->headers_in);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
"h2_request(%d): rwrite %s host=%s://%s%s",
req->id, req->method, req->scheme, req->authority, req->path);
-
return status;
}
@@ -337,11 +338,22 @@ void h2_request_copy(apr_pool_t *p, h2_r
dst->authority = OPT_COPY(p, src->authority);
dst->path = OPT_COPY(p, src->path);
dst->headers = apr_table_clone(p, src->headers);
+ if (src->trailers) {
+ dst->trailers = apr_table_clone(p, src->trailers);
+ }
dst->content_length = src->content_length;
dst->chunked = src->chunked;
dst->eoh = src->eoh;
}
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
+{
+ h2_request *nreq = apr_pcalloc(p, sizeof(*nreq));
+ memcpy(nreq, src, sizeof(*nreq));
+ h2_request_copy(p, nreq, src);
+ return nreq;
+}
+
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
{
request_rec *r;
@@ -361,7 +373,7 @@ request_rec *h2_request_create_rec(const
r->allowed_methods = ap_make_method_list(p, 2);
- r->headers_in = apr_table_copy(r->pool, req->headers);
+ r->headers_in = apr_table_clone(r->pool, req->headers);
r->trailers_in = apr_table_make(r->pool, 5);
r->subprocess_env = apr_table_make(r->pool, 25);
r->headers_out = apr_table_make(r->pool, 12);
@@ -408,7 +420,7 @@ request_rec *h2_request_create_rec(const
}
ap_parse_uri(r, req->path);
- r->protocol = (char*)"HTTP/2";
+ r->protocol = "HTTP/2";
r->proto_num = HTTP_VERSION(2, 0);
r->the_request = apr_psprintf(r->pool, "%s %s %s",
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_request.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_request.h?rev=1733259&r1=1733258&r2=1733259&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_request.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_request.h Wed Mar 2 11:21:28 2016
@@ -16,48 +16,19 @@
#ifndef __mod_h2__h2_request__
#define __mod_h2__h2_request__
-/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
- * format that will be fed to various httpd input filters to finally
- * become a request_rec to be handled by soemone.
- */
-struct h2_config;
-struct h2_to_h1;
-struct h2_mplx;
-struct h2_task;
-
-typedef struct h2_request h2_request;
-
-struct h2_request {
- int id; /* stream id */
-
- const char *method; /* pseudo header values, see ch. 8.1.2.3 */
- const char *scheme;
- const char *authority;
- const char *path;
-
- apr_table_t *headers;
- apr_table_t *trailers;
-
- apr_time_t request_time;
- apr_off_t content_length;
-
- unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
- unsigned int eoh : 1; /* iff end-of-headers has been seen and request is complete */
- unsigned int body : 1; /* iff this request has a body */
- unsigned int push_policy; /* which push policy to use for this request */
- const struct h2_config *config;
-};
+#include "h2.h"
-h2_request *h2_request_create(int id, apr_pool_t *pool,
- const struct h2_config *config);
+h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize);
h2_request *h2_request_createn(int id, apr_pool_t *pool,
- const struct h2_config *config,
const char *method, const char *scheme,
const char *authority, const char *path,
- apr_table_t *headers);
+ apr_table_t *headers, int serialize);
-void h2_request_destroy(h2_request *req);
+apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool,
+ const char *method, const char *scheme,
+ const char *authority, const char *path,
+ apr_table_t *headers);
apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
@@ -74,6 +45,8 @@ apr_status_t h2_request_end_headers(h2_r
void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src);
+h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
+
/**
* Create a request_rec representing the h2_request to be
* processed on the given connection.