You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/03/16 15:01:53 UTC
svn commit: r1735230 - in /httpd/httpd/trunk: ./ modules/http2/
Author: icing
Date: Wed Mar 16 14:01:53 2016
New Revision: 1735230
URL: http://svn.apache.org/viewvc?rev=1735230&view=rev
Log:
mod_http2: fix for bucket lifetime on master conn, mod_proxy_http2: flow control from front- to backend h2 connection
Modified:
httpd/httpd/trunk/CHANGES
httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h
httpd/httpd/trunk/modules/http2/h2_bucket_eos.h
httpd/httpd/trunk/modules/http2/h2_conn_io.c
httpd/httpd/trunk/modules/http2/h2_io.c
httpd/httpd/trunk/modules/http2/h2_mplx.c
httpd/httpd/trunk/modules/http2/h2_mplx.h
httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
httpd/httpd/trunk/modules/http2/h2_ngn_shed.h
httpd/httpd/trunk/modules/http2/h2_proxy_session.c
httpd/httpd/trunk/modules/http2/h2_proxy_session.h
httpd/httpd/trunk/modules/http2/h2_session.c
httpd/httpd/trunk/modules/http2/h2_session.h
httpd/httpd/trunk/modules/http2/h2_task.c
httpd/httpd/trunk/modules/http2/h2_task.h
httpd/httpd/trunk/modules/http2/h2_task_output.c
httpd/httpd/trunk/modules/http2/h2_util.c
httpd/httpd/trunk/modules/http2/h2_util.h
httpd/httpd/trunk/modules/http2/mod_http2.c
httpd/httpd/trunk/modules/http2/mod_http2.h
httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Wed Mar 16 14:01:53 2016
@@ -1,6 +1,13 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_proxy_http2: using HTTP/2 flow control for backend streams by
+ observing data actually send out on the frontend h2 connection.
+ [Stefan Eissing]
+
+ *) mod_http2: fixes problem with wrong lifetime of file buckets on main
+ connection. [Stefan Eissing]
+
*) mpm: Generalise the ap_mpm_register_socket functions to accept pipes
or sockets. [Graham Leggett]
Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eoc.h Wed Mar 16 14:01:53 2016
@@ -21,6 +21,7 @@ struct h2_session;
/** End Of HTTP/2 SESSION (H2EOC) bucket */
extern const apr_bucket_type_t h2_bucket_type_eoc;
+#define H2_BUCKET_IS_H2EOC(e) (e->type == &h2_bucket_type_eoc)
apr_bucket * h2_bucket_eoc_make(apr_bucket *b,
struct h2_session *session);
Modified: httpd/httpd/trunk/modules/http2/h2_bucket_eos.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_eos.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_eos.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_eos.h Wed Mar 16 14:01:53 2016
@@ -21,6 +21,7 @@ struct h2_stream;
/** End Of HTTP/2 STREAM (H2EOS) bucket */
extern const apr_bucket_type_t h2_bucket_type_eos;
+#define H2_BUCKET_IS_H2EOS(e) (e->type == &h2_bucket_type_eos)
apr_bucket *h2_bucket_eos_make(apr_bucket *b, struct h2_stream *stream);
Modified: httpd/httpd/trunk/modules/http2/h2_conn_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn_io.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn_io.c Wed Mar 16 14:01:53 2016
@@ -14,16 +14,18 @@
*/
#include <assert.h>
-
+#include <apr_strings.h>
#include <ap_mpm.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include <http_connection.h>
+#include <http_request.h>
#include "h2_private.h"
#include "h2_bucket_eoc.h"
+#include "h2_bucket_eos.h"
#include "h2_config.h"
#include "h2_conn_io.h"
#include "h2_h2.h"
@@ -46,6 +48,84 @@
#define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX)
+static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
+ const char *tag, apr_bucket_brigade *bb)
+{
+ char buffer[16 * 1024];
+ const char *line = "(null)";
+ apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+ int off = 0;
+ apr_bucket *b;
+
+ if (bb) {
+ memset(buffer, 0, bmax--);
+ for (b = APR_BRIGADE_FIRST(bb);
+ bmax && (b != APR_BRIGADE_SENTINEL(bb));
+ b = APR_BUCKET_NEXT(b)) {
+
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eos ");
+ }
+ else if (APR_BUCKET_IS_FLUSH(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "flush ");
+ }
+ else if (AP_BUCKET_IS_EOR(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eor ");
+ }
+ else if (H2_BUCKET_IS_H2EOC(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "h2eoc ");
+ }
+ else if (H2_BUCKET_IS_H2EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "h2eos ");
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+ }
+ }
+ else {
+ const char *btype = "data";
+ if (APR_BUCKET_IS_FILE(b)) {
+ btype = "file";
+ }
+ else if (APR_BUCKET_IS_PIPE(b)) {
+ btype = "pipe";
+ }
+ else if (APR_BUCKET_IS_SOCKET(b)) {
+ btype = "socket";
+ }
+ else if (APR_BUCKET_IS_HEAP(b)) {
+ btype = "heap";
+ }
+ else if (APR_BUCKET_IS_TRANSIENT(b)) {
+ btype = "transient";
+ }
+ else if (APR_BUCKET_IS_IMMORTAL(b)) {
+ btype = "immortal";
+ }
+#if APR_HAS_MMAP
+ else if (APR_BUCKET_IS_MMAP(b)) {
+ btype = "mmap";
+ }
+#endif
+ else if (APR_BUCKET_IS_POOL(b)) {
+ btype = "pool";
+ }
+
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ",
+ btype,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
+ }
+ }
+ line = *buffer? buffer : "(empty)";
+ }
+ /* Intentional no APLOGNO */
+ ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s",
+ c->id, stream_id, tag, line);
+
+}
+
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
const h2_config *cfg,
apr_pool_t *pool)
@@ -112,16 +192,17 @@ static apr_status_t pass_out(apr_bucket_
}
ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
- status = apr_brigade_length(bb, 0, &bblen);
- if (status == APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03044)
+ apr_brigade_length(bb, 0, &bblen);
+ h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb);
+ status = ap_pass_brigade(c->output_filters, bb);
+ if (status == APR_SUCCESS && pctx->io) {
+ pctx->io->bytes_written += (apr_size_t)bblen;
+ pctx->io->last_write = apr_time_now();
+ }
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044)
"h2_conn_io(%ld): pass_out brigade %ld bytes",
c->id, (long)bblen);
- status = ap_pass_brigade(c->output_filters, bb);
- if (status == APR_SUCCESS && pctx->io) {
- pctx->io->bytes_written += (apr_size_t)bblen;
- pctx->io->last_write = apr_time_now();
- }
}
apr_brigade_cleanup(bb);
return status;
@@ -179,9 +260,10 @@ apr_status_t h2_conn_io_writeb(h2_conn_i
return APR_SUCCESS;
}
-static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force, int eoc)
+static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
{
pass_out_ctx ctx;
+ apr_bucket *b;
if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
return APR_SUCCESS;
@@ -195,13 +277,12 @@ static apr_status_t h2_conn_io_flush_int
bucketeer_buffer(io);
}
- if (force) {
- apr_bucket *b = apr_bucket_flush_create(io->c->bucket_alloc);
+ if (flush) {
+ b = apr_bucket_flush_create(io->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
- /* Send it out */
io->buflen = 0;
ctx.c = io->c;
ctx.io = eoc? NULL : io;
@@ -221,11 +302,11 @@ apr_status_t h2_conn_io_consider_pass(h2
apr_off_t len = 0;
if (!APR_BRIGADE_EMPTY(io->output)) {
- apr_brigade_length(io->output, 0, &len);
+ len = h2_brigade_mem_size(io->output);
}
len += io->buflen;
if (len >= WRITE_BUFFER_SIZE) {
- return h2_conn_io_flush_int(io, 0, 0);
+ return h2_conn_io_flush_int(io, 1, 0);
}
return APR_SUCCESS;
}
@@ -256,7 +337,7 @@ apr_status_t h2_conn_io_write(h2_conn_io
while (length > 0 && (status == APR_SUCCESS)) {
apr_size_t avail = io->bufsize - io->buflen;
if (avail <= 0) {
- h2_conn_io_flush_int(io, 0, 0);
+ status = h2_conn_io_flush_int(io, 0, 0);
}
else if (length > avail) {
memcpy(io->buffer + io->buflen, buf, avail);
Modified: httpd/httpd/trunk/modules/http2/h2_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.c Wed Mar 16 14:01:53 2016
@@ -397,8 +397,10 @@ apr_status_t h2_io_out_read_to(h2_io *io
if (!is_out_readable(io, plen, peos, &status)) {
return status;
}
- io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen);
status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to");
+ if (status == APR_SUCCESS && io->eos_out && APR_BRIGADE_EMPTY(io->bbout)) {
+ io->eos_out_read = *peos = 1;
+ }
io->output_consumed += *plen;
return status;
}
@@ -423,21 +425,6 @@ apr_status_t h2_io_out_write(h2_io *io,
return APR_ECONNABORTED;
}
- if (!io->eor) {
- /* Filter the EOR bucket and set it aside. We prefer to tear down
- * the request when the whole h2 stream is done */
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (AP_BUCKET_IS_EOR(b)) {
- APR_BUCKET_REMOVE(b);
- io->eor = b;
- break;
- }
- }
- }
-
if (io->eos_out) {
apr_off_t len = 0;
/* We have already delivered an EOS bucket to a reader, no
@@ -448,6 +435,23 @@ apr_status_t h2_io_out_write(h2_io *io,
return (len > 0)? APR_EOF : APR_SUCCESS;
}
+ /* Filter the EOR bucket and set it aside. We prefer to tear down
+ * the request when the whole h2 stream is done */
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ if (AP_BUCKET_IS_EOR(b)) {
+ APR_BUCKET_REMOVE(b);
+ io->eor = b;
+ break;
+ }
+ else if (APR_BUCKET_IS_EOS(b)) {
+ io->eos_out = 1;
+ break;
+ }
+ }
+
process_trailers(io, trailers);
/* Let's move the buckets from the request processing in here, so
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Wed Mar 16 14:01:53 2016
@@ -195,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
return NULL;
}
- status = apr_thread_cond_create(&m->req_added, m->pool);
+ status = apr_thread_cond_create(&m->task_thawed, m->pool);
if (status != APR_SUCCESS) {
h2_mplx_destroy(m);
return NULL;
@@ -254,7 +254,7 @@ static void workers_register(h2_mplx *m)
h2_workers_register(m->workers, m);
}
-static int io_process_events(h2_mplx *m, h2_io *io)
+static int io_in_consumed_signal(h2_mplx *m, h2_io *io)
{
if (io->input_consumed && m->input_consumed) {
m->input_consumed(m->input_consumed_ctx,
@@ -265,6 +265,17 @@ static int io_process_events(h2_mplx *m,
return 0;
}
+static int io_out_consumed_signal(h2_mplx *m, h2_io *io)
+{
+ if (io->output_consumed && io->task && io->task->assigned) {
+ h2_req_engine_out_consumed(io->task->assigned, io->task->c,
+ io->output_consumed);
+ io->output_consumed = 0;
+ return 1;
+ }
+ return 0;
+}
+
static void io_destroy(h2_mplx *m, h2_io *io, int events)
{
apr_pool_t *pool;
@@ -273,7 +284,7 @@ static void io_destroy(h2_mplx *m, h2_io
h2_io_in_shutdown(io);
if (events) {
/* Process outstanding events before destruction */
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
/* The pool is cleared/destroyed which also closes all
@@ -299,7 +310,7 @@ static void io_destroy(h2_mplx *m, h2_io
pool = io->pool;
io->pool = NULL;
- if (pool) {
+ if (0 && pool) {
apr_pool_clear(pool);
if (m->spare_pool) {
apr_pool_destroy(m->spare_pool);
@@ -377,7 +388,7 @@ apr_status_t h2_mplx_release_and_join(h2
h2_mplx_set_consumed_cb(m, NULL, NULL);
h2_iq_clear(m->q);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -413,7 +424,7 @@ apr_status_t h2_mplx_release_and_join(h2
}
}
h2_mplx_abort(m);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
}
}
@@ -460,7 +471,7 @@ apr_status_t h2_mplx_stream_done(h2_mplx
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld-%d): marking stream as done.",
m->id, stream_id);
io_stream_done(m, io, rst_error);
@@ -523,7 +534,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m
status = h2_io_in_write(io, data, len, eos);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
h2_io_signal(io, H2_IO_READ);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
else {
status = APR_ECONNABORTED;
@@ -545,7 +556,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
status = h2_io_in_close(io);
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close");
h2_io_signal(io, H2_IO_READ);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
else {
status = APR_ECONNABORTED;
@@ -555,6 +566,12 @@ apr_status_t h2_mplx_in_close(h2_mplx *m
return status;
}
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+ m->input_consumed = cb;
+ m->input_consumed_ctx = ctx;
+}
+
typedef struct {
h2_mplx * m;
int streams_updated;
@@ -563,18 +580,12 @@ typedef struct {
static int update_window(void *ctx, h2_io *io)
{
update_ctx *uctx = (update_ctx*)ctx;
- if (io_process_events(uctx->m, io)) {
+ if (io_in_consumed_signal(uctx->m, io)) {
++uctx->streams_updated;
}
return 1;
}
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
- m->input_consumed = cb;
- m->input_consumed_ctx = ctx;
-}
-
apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
{
apr_status_t status;
@@ -702,7 +713,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *
* shutdown input and send out any events (e.g. window
* updates) asap. */
h2_io_in_shutdown(io);
- io_process_events(m, io);
+ io_in_consumed_signal(m, io);
}
}
@@ -729,8 +740,10 @@ static apr_status_t out_write(h2_mplx *m
&& !APR_BRIGADE_EMPTY(bb)
&& !is_aborted(m, &status)) {
- status = h2_io_out_write(io, bb, m->stream_max_mem, trailers,
- &m->tx_handles_reserved);
+ status = h2_io_out_write(io, bb, blocking? m->stream_max_mem : INT_MAX,
+ trailers, &m->tx_handles_reserved);
+ io_out_consumed_signal(m, io);
+
/* Wait for data to drain until there is room again or
* stream timeout expires */
h2_io_signal_init(io, H2_IO_WRITE, m->stream_timeout, iowait);
@@ -740,6 +753,9 @@ static apr_status_t out_write(h2_mplx *m
&& (m->stream_max_mem <= h2_io_out_length(io))
&& !is_aborted(m, &status)) {
if (!blocking) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_mplx(%ld-%d): incomplete write",
+ m->id, io->id);
return APR_INCOMPLETE;
}
trailers = NULL;
@@ -874,11 +890,8 @@ apr_status_t h2_mplx_out_close(h2_mplx *
trailers? "yes" : "no");
status = h2_io_out_close(io, trailers);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+ io_out_consumed_signal(m, io);
- if (io->eor) {
- apr_bucket_delete(io->eor);
- io->eor = NULL;
- }
have_out_data_for(m, stream_id);
}
else {
@@ -1061,7 +1074,7 @@ static h2_task *pop_task(h2_mplx *m)
}
}
else if (io) {
- conn_rec *slave = h2_slave_create(m->c, io->pool, m->spare_allocator);
+ conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator);
m->spare_allocator = NULL;
io->task = task = h2_task_create(m->id, io->request, slave, m);
apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
@@ -1100,7 +1113,7 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, in
return task;
}
-static void task_done(h2_mplx *m, h2_task *task)
+static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
if (task->frozen) {
@@ -1112,7 +1125,7 @@ static void task_done(h2_mplx *m, h2_tas
* bodies into the mplx. */
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
- apr_thread_cond_broadcast(m->req_added);
+ apr_thread_cond_broadcast(m->task_thawed);
}
else {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1126,6 +1139,18 @@ static void task_done(h2_mplx *m, h2_tas
* other mplx's. Perhaps leave after n requests? */
h2_mplx_out_close(m, task->stream_id, NULL);
+ if (ngn && io) {
+ apr_off_t bytes = io->output_consumed + h2_io_out_length(io);
+ if (bytes > 0) {
+ /* we need to report consumed and current buffered output
+ * to the engine. The request will be streamed out or cancelled,
+ * no more data is coming from it and the engine should update
+ * its calculations before we destroy this information. */
+ h2_req_engine_out_consumed(ngn, task->c, bytes);
+ io->output_consumed = 0;
+ }
+ }
+
if (task->engine) {
if (!h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
@@ -1194,7 +1219,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- task_done(m, task);
+ task_done(m, task, NULL);
--m->workers_busy;
if (ptask) {
/* caller wants another task */
@@ -1347,8 +1372,37 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
* HTTP/2 request engines
******************************************************************************/
+typedef struct {
+ h2_mplx * m;
+ h2_req_engine *ngn;
+ int streams_updated;
+} ngn_update_ctx;
+
+static int ngn_update_window(void *ctx, h2_io *io)
+{
+ ngn_update_ctx *uctx = ctx;
+ if (io && io->task && io->task->assigned == uctx->ngn
+ && io_out_consumed_signal(uctx->m, io)) {
+ ++uctx->streams_updated;
+ }
+ return 1;
+}
+
+static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
+{
+ ngn_update_ctx ctx;
+
+ ctx.m = m;
+ ctx.ngn = ngn;
+ ctx.streams_updated = 0;
+ h2_io_set_iter(m->stream_ios, ngn_update_window, &ctx);
+
+ return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
+}
+
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
- request_rec *r, h2_req_engine_init *einit)
+ request_rec *r,
+ http2_req_engine_init *einit)
{
apr_status_t status;
h2_mplx *m;
@@ -1360,6 +1414,7 @@ apr_status_t h2_mplx_req_engine_push(con
return APR_ECONNABORTED;
}
m = task->mplx;
+ task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1367,8 +1422,7 @@ apr_status_t h2_mplx_req_engine_push(con
status = APR_ECONNABORTED;
}
else {
- status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type,
- task, r, einit);
+ status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
leave_mutex(m, acquired);
}
@@ -1383,30 +1437,37 @@ apr_status_t h2_mplx_req_engine_pull(h2_
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
+ h2_task *task = NULL;
int acquired;
- *pr = NULL;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int want_shutdown = (block == APR_BLOCK_READ);
+
+ /* Take this opportunity to update output consummation
+ * for this engine */
+ ngn_out_update_windows(m, ngn);
+
if (want_shutdown && !h2_iq_empty(m->q)) {
/* For a blocking read, check first if requests are to be
* had and, if not, wait a short while before doing the
* blocking, and if unsuccessful, terminating read.
*/
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
if (APR_STATUS_IS_EAGAIN(status)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): start block engine pull", m->id);
- apr_thread_cond_timedwait(m->req_added, m->lock,
+ apr_thread_cond_timedwait(m->task_thawed, m->lock,
apr_time_from_msec(20));
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
}
}
else {
- status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
+ status = h2_ngn_shed_pull_task(shed, ngn, capacity,
+ want_shutdown, &task);
}
leave_mutex(m, acquired);
}
+ *pr = task? task->r : NULL;
return status;
}
@@ -1419,13 +1480,16 @@ void h2_mplx_req_engine_done(h2_req_engi
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ ngn_out_update_windows(m, ngn);
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
if (task->engine) {
/* cannot report that as done until engine returns */
}
else {
- task_done(m, task);
+ task_done(m, task, ngn);
}
+ /* Take this opportunity to update output consummation
+ * for this engine */
leave_mutex(m, acquired);
}
}
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Wed Mar 16 14:01:53 2016
@@ -90,7 +90,7 @@ struct h2_mplx {
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
- struct apr_thread_cond_t *req_added;
+ struct apr_thread_cond_t *task_thawed;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
@@ -405,12 +405,15 @@ apr_status_t h2_mplx_idle(h2_mplx *m);
* h2_req_engine handling
******************************************************************************/
+typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
- request_rec *r);
+ request_rec *r,
+ h2_output_consumed **pconsumed,
+ void **pbaton);
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r,
Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.c Wed Mar 16 14:01:53 2016
@@ -34,6 +34,7 @@
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_int_queue.h"
+#include "h2_mplx.h"
#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
@@ -46,7 +47,6 @@ typedef struct h2_ngn_entry h2_ngn_entry
struct h2_ngn_entry {
APR_RING_ENTRY(h2_ngn_entry) link;
h2_task *task;
- request_rec *r;
};
#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
@@ -84,6 +84,9 @@ struct h2_req_engine {
apr_uint32_t no_assigned; /* # of assigned requests */
apr_uint32_t no_live; /* # of live */
apr_uint32_t no_finished; /* # of finished */
+
+ h2_output_consumed *out_consumed;
+ void *out_consumed_ctx;
};
const char *h2_req_engine_get_id(h2_req_engine *engine)
@@ -96,6 +99,14 @@ int h2_req_engine_is_shutdown(h2_req_eng
return engine->shutdown;
}
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
+ apr_off_t bytes)
+{
+ if (engine->out_consumed) {
+ engine->out_consumed(engine->out_consumed_ctx, c, bytes);
+ }
+}
+
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t default_capacity,
apr_uint32_t req_buffer_size)
@@ -132,26 +143,25 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed
shed->aborted = 1;
}
-static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
+static void ngn_add_task(h2_req_engine *ngn, h2_task *task)
{
h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->task = task;
- entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
}
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
- h2_task *task, request_rec *r,
- h2_req_engine_init *einit){
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
+ h2_task *task, http2_req_engine_init *einit)
+{
h2_req_engine *ngn;
AP_DEBUG_ASSERT(shed);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id,
- apr_table_get(r->connection->notes, H2_TASK_ID_NOTE));
+ task->id);
if (task->ser_headers) {
/* Max compatibility, deny processing of this */
return APR_EOF;
@@ -165,10 +175,10 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
"h2_ngn_shed(%ld): pushing request %s to %s",
shed->c->id, task->id, ngn->id);
if (!h2_task_is_detached(task)) {
- h2_task_freeze(task, r);
+ h2_task_freeze(task);
}
/* FIXME: sometimes ngn is garbage, probly alread freed */
- ngn_add_req(ngn, task, r);
+ ngn_add_task(ngn, task);
ngn->no_assigned++;
return APR_SUCCESS;
}
@@ -191,7 +201,8 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
- shed->req_buffer_size, r);
+ shed->req_buffer_size, task->r,
+ &newngn->out_consumed, &newngn->out_consumed_ctx);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
"h2_ngn_shed(%ld): create engine %s (%s)",
shed->c->id, newngn->id, newngn->type);
@@ -199,6 +210,7 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
AP_DEBUG_ASSERT(task->engine == NULL);
newngn->task = task;
task->engine = newngn;
+ task->assigned = newngn;
apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
}
return status;
@@ -206,13 +218,17 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn
return APR_EOF;
}
-static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
+static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
{
h2_ngn_entry *entry;
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
- if (!entry->task->frozen) {
+ if (h2_task_is_detached(entry->task)
+ || (entry->task->engine == ngn)) {
+ /* The task hosting this engine can always be pulled by it.
+ * For other task, they need to become detached, e.g. no longer
+ * assigned to another worker. */
H2_NGN_ENTRY_REMOVE(entry);
return entry;
}
@@ -220,16 +236,19 @@ static h2_ngn_entry *pop_non_frozen(h2_r
return NULL;
}
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
- h2_req_engine *ngn,
- apr_uint32_t capacity,
- int want_shutdown,
- request_rec **pr)
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed,
+ h2_req_engine *ngn,
+ apr_uint32_t capacity,
+ int want_shutdown,
+ h2_task **ptask)
{
h2_ngn_entry *entry;
AP_DEBUG_ASSERT(ngn);
- *pr = NULL;
+ *ptask = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+ "h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d",
+ shed->c->id, ngn->id, want_shutdown);
if (shed->aborted) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
"h2_ngn_shed(%ld): abort while pulling requests %s",
@@ -249,14 +268,22 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn
return ngn->shutdown? APR_EOF : APR_EAGAIN;
}
- if ((entry = pop_non_frozen(ngn))) {
+ if ((entry = pop_detached(ngn))) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
"h2_ngn_shed(%ld): pulled request %s for engine %s",
shed->c->id, entry->task->id, ngn->id);
ngn->no_live++;
- *pr = entry->r;
+ *ptask = entry->task;
+ entry->task->assigned = ngn;
return APR_SUCCESS;
}
+
+ if (1) {
+ h2_ngn_entry *entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
+ "h2_ngn_shed(%ld): pull task, nothing, first task %s",
+ shed->c->id, entry->task->id);
+ }
return APR_EAGAIN;
}
@@ -298,8 +325,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *s
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
- request_rec *r = entry->r;
- h2_task *task = h2_ctx_rget_task(r);
+ h2_task *task = entry->task;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
Modified: httpd/httpd/trunk/modules/http2/h2_ngn_shed.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_ngn_shed.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_ngn_shed.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_ngn_shed.h Wed Mar 16 14:01:53 2016
@@ -35,12 +35,17 @@ struct h2_ngn_shed {
const char *h2_req_engine_get_id(h2_req_engine *engine);
int h2_req_engine_is_shutdown(h2_req_engine *engine);
+void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
+ apr_off_t bytes);
+
typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
- request_rec *r);
+ request_rec *r,
+ h2_output_consumed **pconsumed,
+ void **pbaton);
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t default_capactiy,
@@ -53,13 +58,13 @@ h2_ngn_shed *h2_ngn_shed_get_shed(struct
void h2_ngn_shed_abort(h2_ngn_shed *shed);
-apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
- struct h2_task *task, request_rec *r,
+apr_status_t h2_ngn_shed_push_task(h2_ngn_shed *shed, const char *ngn_type,
+ struct h2_task *task,
h2_shed_ngn_init *init_cb);
-apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
- apr_uint32_t capacity,
- int want_shutdown, request_rec **pr);
+apr_status_t h2_ngn_shed_pull_task(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
+ apr_uint32_t capacity,
+ int want_shutdown, struct h2_task **ptask);
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn,
Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Wed Mar 16 14:01:53 2016
@@ -372,7 +372,6 @@ static int on_data_chunk_recv(nghttp2_se
stream_id, NGHTTP2_STREAM_CLOSED);
return NGHTTP2_ERR_STREAM_CLOSING;
}
- nghttp2_session_consume(ngh2, stream_id, len);
return 0;
}
@@ -1042,6 +1041,7 @@ static void ev_stream_done(h2_proxy_sess
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_sesssion(%s): stream(%d) closed",
session->id, stream_id);
+
if (!stream->data_received) {
apr_bucket *b;
/* if the response had no body, this is the time to flush
@@ -1286,7 +1286,8 @@ static int done_iter(void *udata, void *
{
cleanup_iter_ctx *ctx = udata;
h2_proxy_stream *stream = val;
- int touched = (stream->id <= ctx->session->last_stream_id);
+ int touched = (!ctx->session->last_stream_id ||
+ stream->id <= ctx->session->last_stream_id);
ctx->done(ctx->session, stream->r, 0, touched);
return 1;
}
@@ -1306,3 +1307,49 @@ void h2_proxy_session_cleanup(h2_proxy_s
}
}
+typedef struct {
+ h2_proxy_session *session;
+ conn_rec *c;
+ apr_off_t bytes;
+ int updated;
+} win_update_ctx;
+
+static int win_update_iter(void *udata, void *val)
+{
+ win_update_ctx *ctx = udata;
+ h2_proxy_stream *stream = val;
+
+ if (stream->r && stream->r->connection == ctx->c) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
+ "h2_proxy_session(%s-%d): win_update %ld bytes",
+ ctx->session->id, (int)stream->id, (long)ctx->bytes);
+ nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
+ ctx->updated = 1;
+ return 0;
+ }
+ return 1;
+}
+
+
+void h2_proxy_session_update_window(h2_proxy_session *session,
+ conn_rec *c, apr_off_t bytes)
+{
+ if (session->streams && !h2_ihash_is_empty(session->streams)) {
+ win_update_ctx ctx;
+ ctx.session = session;
+ ctx.c = c;
+ ctx.bytes = bytes;
+ ctx.updated = 0;
+ h2_ihash_iter(session->streams, win_update_iter, &ctx);
+
+ if (!ctx.updated) {
+ /* could not find the stream any more, possibly closed, update
+ * the connection window at least */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_proxy_session(%s): win_update conn %ld bytes",
+ session->id, (long)bytes);
+ nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
+ }
+ }
+}
+
Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Wed Mar 16 14:01:53 2016
@@ -103,6 +103,9 @@ apr_status_t h2_proxy_session_process(h2
void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
+void h2_proxy_session_update_window(h2_proxy_session *s,
+ conn_rec *c, apr_off_t bytes);
+
#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url"
#endif /* h2_proxy_session_h */
Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Wed Mar 16 14:01:53 2016
@@ -686,7 +686,9 @@ static apr_status_t h2_session_shutdown(
h2_mplx_get_max_stream_started(session->mplx),
reason, (uint8_t*)err, err? strlen(err):0);
status = nghttp2_session_send(session->ngh2);
- h2_conn_io_flush(&session->io);
+ if (status == APR_SUCCESS) {
+ status = h2_conn_io_flush(&session->io);
+ }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03069)
"session(%ld): sent GOAWAY, err=%d, msg=%s",
session->id, reason, err? err : "");
@@ -1432,6 +1434,9 @@ apr_status_t h2_session_stream_destroy(h
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): cleanup by EOS bucket destroy",
+ session->id, stream->id);
/* this may be called while the session has already freed
* some internal structures or even when the mplx is locked. */
if (session->mplx) {
@@ -1704,6 +1709,7 @@ static void h2_session_ev_init(h2_sessio
static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg)
{
+ session->local_shutdown = 1;
switch (session->state) {
case H2_SESSION_ST_LOCAL_SHUTDOWN:
/* already did that? */
@@ -2195,7 +2201,8 @@ apr_status_t h2_session_process(h2_sessi
}
else if (status == APR_TIMEUP) {
/* go back to checking all inputs again */
- transit(session, "wait cycle", H2_SESSION_ST_BUSY);
+ transit(session, "wait cycle", session->local_shutdown?
+ H2_SESSION_ST_LOCAL_SHUTDOWN : H2_SESSION_ST_BUSY);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
@@ -2219,7 +2226,10 @@ apr_status_t h2_session_process(h2_sessi
break;
}
- h2_conn_io_flush(&session->io);
+ status = h2_conn_io_flush(&session->io);
+ if (status != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ }
if (!nghttp2_session_want_read(session->ngh2)
&& !nghttp2_session_want_write(session->ngh2)) {
dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL);
Modified: httpd/httpd/trunk/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.h Wed Mar 16 14:01:53 2016
@@ -85,6 +85,7 @@ typedef struct h2_session {
unsigned int reprioritize : 1; /* scheduled streams priority changed */
unsigned int eoc_written : 1; /* h2 eoc bucket written */
unsigned int flush : 1; /* flushing output necessary */
+ unsigned int local_shutdown: 1; /* GOAWAY has been sent by us */
apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */
int unsent_submits; /* number of submitted, but not yet written responses. */
Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Wed Mar 16 14:01:53 2016
@@ -291,7 +291,7 @@ static int h2_task_process_conn(conn_rec
return DECLINED;
}
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+apr_status_t h2_task_freeze(h2_task *task)
{
if (!task->frozen) {
task->frozen = 1;
Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Wed Mar 16 14:01:53 2016
@@ -66,7 +66,9 @@ struct h2_task {
struct h2_task_output *output;
struct apr_thread_cond_t *io; /* used to wait for events on */
- struct h2_req_engine *engine;
+ struct h2_req_engine *engine; /* engine hosted by this task */
+ struct h2_req_engine *assigned; /* engine that task has been assigned to */
+ request_rec *r; /* request being processed in this task */
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
@@ -83,7 +85,7 @@ apr_status_t h2_task_init(apr_pool_t *po
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
-apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_freeze(h2_task *task);
apr_status_t h2_task_thaw(h2_task *task);
int h2_task_is_detached(h2_task *task);
Modified: httpd/httpd/trunk/modules/http2/h2_task_output.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.c Wed Mar 16 14:01:53 2016
@@ -158,8 +158,7 @@ apr_status_t h2_task_output_write(h2_tas
}
/* Attempt to write saved brigade first */
- if (status == APR_SUCCESS && output->bb
- && !APR_BRIGADE_EMPTY(output->bb)) {
+ if (status == APR_SUCCESS && output->bb && !APR_BRIGADE_EMPTY(output->bb)) {
status = write_brigade_raw(output, f, output->bb);
}
Modified: httpd/httpd/trunk/modules/http2/h2_util.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.c Wed Mar 16 14:01:53 2016
@@ -14,7 +14,6 @@
*/
#include <assert.h>
-
#include <apr_strings.h>
#include <httpd.h>
@@ -636,20 +635,6 @@ apr_status_t h2_util_copy(apr_bucket_bri
return status;
}
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb)
-{
- apr_bucket *b;
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (APR_BUCKET_IS_EOS(b) || APR_BUCKET_IS_FLUSH(b)) {
- return 1;
- }
- }
- return 0;
-}
-
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len)
{
apr_bucket *b, *end;
@@ -950,6 +935,27 @@ apr_status_t h2_transfer_brigade(apr_buc
return APR_SUCCESS;
}
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb)
+{
+ apr_bucket *b;
+ apr_off_t total = 0;
+
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b))
+ {
+ total += sizeof(*b);
+ if (b->length > 0) {
+ if (APR_BUCKET_IS_HEAP(b)
+ || APR_BUCKET_IS_POOL(b)) {
+ total += b->length;
+ }
+ }
+ }
+ return total;
+}
+
+
/*******************************************************************************
* h2_ngheader
******************************************************************************/
Modified: httpd/httpd/trunk/modules/http2/h2_util.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_util.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_util.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_util.h Wed Mar 16 14:01:53 2016
@@ -196,7 +196,6 @@ apr_status_t h2_util_copy(apr_bucket_bri
* @param bb the brigade to check on
* @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
*/
-int h2_util_has_flush_or_eos(apr_bucket_brigade *bb);
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
int h2_util_bb_has_data(apr_bucket_brigade *bb);
int h2_util_bb_has_data_or_eos(apr_bucket_brigade *bb);
@@ -257,4 +256,13 @@ apr_status_t h2_transfer_brigade(apr_buc
apr_off_t *plen,
int *peos);
+/**
+ * Get an approximnation of the memory footprint of the given
+ * brigade. This varies from apr_brigade_length as
+ * - no buckets are ever read
+ * - only buckets known to allocate memory (HEAP+POOL) are counted
+ * - the bucket struct itself is counted
+ */
+apr_off_t h2_brigade_mem_size(apr_bucket_brigade *bb);
+
#endif /* defined(__mod_h2__h2_util__) */
Modified: httpd/httpd/trunk/modules/http2/mod_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.c Wed Mar 16 14:01:53 2016
@@ -130,7 +130,7 @@ static int http2_is_h2(conn_rec *);
static apr_status_t http2_req_engine_push(const char *ngn_type,
request_rec *r,
- h2_req_engine_init *einit)
+ http2_req_engine_init *einit)
{
return h2_mplx_req_engine_push(ngn_type, r, einit);
}
Modified: httpd/httpd/trunk/modules/http2/mod_http2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.h?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.h (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.h Wed Mar 16 14:01:53 2016
@@ -36,6 +36,8 @@ struct apr_thread_cond_t;
typedef struct h2_req_engine h2_req_engine;
+typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
+
/**
* Initialize a h2_req_engine. The structure will be passed in but
* only the name and master are set. The function should initialize
@@ -43,12 +45,14 @@ typedef struct h2_req_engine h2_req_engi
* @param engine the allocated, partially filled structure
* @param r the first request to process, or NULL
*/
-typedef apr_status_t h2_req_engine_init(h2_req_engine *engine,
- const char *id,
- const char *type,
- apr_pool_t *pool,
- apr_uint32_t req_buffer_size,
- request_rec *r);
+typedef apr_status_t http2_req_engine_init(h2_req_engine *engine,
+ const char *id,
+ const char *type,
+ apr_pool_t *pool,
+ apr_uint32_t req_buffer_size,
+ request_rec *r,
+ http2_output_consumed **pconsumed,
+ void **pbaton);
/**
* Push a request to an engine with the specified name for further processing.
@@ -66,7 +70,7 @@ typedef apr_status_t h2_req_engine_init(
APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_push, (const char *engine_type,
request_rec *r,
- h2_req_engine_init *einit));
+ http2_req_engine_init *einit));
/**
* Get a new request for processing in this engine.
Modified: httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_proxy_http2.c?rev=1735230&r1=1735229&r2=1735230&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_proxy_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_proxy_http2.c Wed Mar 16 14:01:53 2016
@@ -42,7 +42,7 @@ AP_DECLARE_MODULE(proxy_http2) = {
/* Optional functions from mod_http2 */
static int (*is_h2)(conn_rec *c);
static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
- h2_req_engine_init *einit);
+ http2_req_engine_init *einit);
static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
apr_read_type_e block,
apr_uint32_t capacity,
@@ -71,7 +71,8 @@ typedef struct h2_proxy_ctx {
unsigned is_ssl : 1;
unsigned flushall : 1;
- apr_status_t r_status; /* status of our first request work */
+ apr_status_t r_status; /* status of our first request work */
+ h2_proxy_session *session; /* current http2 session against backend */
} h2_proxy_ctx;
static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
@@ -196,12 +197,23 @@ static int proxy_http2_canon(request_rec
return OK;
}
+static void out_consumed(void *baton, conn_rec *c, apr_off_t bytes)
+{
+ h2_proxy_ctx *ctx = baton;
+
+ if (ctx->session) {
+ h2_proxy_session_update_window(ctx->session, c, bytes);
+ }
+}
+
static apr_status_t proxy_engine_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
- request_rec *r)
+ request_rec *r,
+ http2_output_consumed **pconsumed,
+ void **pctx)
{
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
@@ -212,6 +224,8 @@ static apr_status_t proxy_engine_init(h2
ctx->engine_pool = pool;
ctx->req_buffer_size = req_buffer_size;
ctx->capacity = 100;
+ *pconsumed = out_consumed;
+ *pctx = ctx;
return APR_SUCCESS;
}
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
@@ -250,7 +264,7 @@ static void request_done(h2_proxy_sessio
if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
/* push to engine */
- ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, r->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
"h2_proxy_session(%s): rescheduled request %s",
ctx->engine_id, task_id);
return;
@@ -287,8 +301,8 @@ static apr_status_t next_request(h2_prox
}
else if (req_engine_pull && ctx->engine) {
apr_status_t status;
- status = req_engine_pull(ctx->engine,
- before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ,
+ status = req_engine_pull(ctx->engine, before_leave?
+ APR_BLOCK_READ: APR_NONBLOCK_READ,
ctx->capacity, &ctx->next);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner,
"h2_proxy_engine(%s): pulled request %s",
@@ -301,40 +315,39 @@ static apr_status_t next_request(h2_prox
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
apr_status_t status = OK;
- h2_proxy_session *session;
/* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors.
*/
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"eng(%s): setup session", ctx->engine_id);
- session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
- 30, h2_log2(ctx->req_buffer_size),
- request_done);
- if (!session) {
+ ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
+ 30, h2_log2(ctx->req_buffer_size),
+ request_done);
+ if (!ctx->session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
"session unavailable");
return HTTP_SERVICE_UNAVAILABLE;
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
- "eng(%s): run session %s", ctx->engine_id, session->id);
- session->user_data = ctx;
+ "eng(%s): run session %s", ctx->engine_id, ctx->session->id);
+ ctx->session->user_data = ctx;
while (1) {
if (ctx->next) {
- add_request(session, ctx->next);
+ add_request(ctx->session, ctx->next);
ctx->next = NULL;
}
- status = h2_proxy_session_process(session);
+ status = h2_proxy_session_process(ctx->session);
if (status == APR_SUCCESS) {
apr_status_t s2;
/* ongoing processing, call again */
- if (session->remote_max_concurrent > 0
- && session->remote_max_concurrent != ctx->capacity) {
- ctx->capacity = session->remote_max_concurrent;
+ if (ctx->session->remote_max_concurrent > 0
+ && ctx->session->remote_max_concurrent != ctx->capacity) {
+ ctx->capacity = ctx->session->remote_max_concurrent;
}
s2 = next_request(ctx, 0);
if (s2 == APR_ECONNABORTED) {
@@ -344,7 +357,7 @@ static apr_status_t proxy_engine_run(h2_
status = s2;
break;
}
- if (!ctx->next && h2_ihash_is_empty(session->streams)) {
+ if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) {
break;
}
}
@@ -357,12 +370,13 @@ static apr_status_t proxy_engine_run(h2_
* a) be reopened on the new session iff safe to do so
* b) reported as done (failed) otherwise
*/
- h2_proxy_session_cleanup(session, request_done);
+ h2_proxy_session_cleanup(ctx->session, request_done);
break;
}
}
- session->user_data = NULL;
+ ctx->session->user_data = NULL;
+ ctx->session = NULL;
return status;
}
@@ -556,6 +570,8 @@ run_session:
/* session and connection still ok */
if (next_request(ctx, 1) == APR_SUCCESS) {
/* more requests, run again */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
+ "run_session, again");
goto run_session;
}
/* done */