You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2016/02/15 18:10:55 UTC
svn commit: r1730572 - /httpd/httpd/trunk/modules/http2/
Author: icing
Date: Mon Feb 15 17:10:54 2016
New Revision: 1730572
URL: http://svn.apache.org/viewvc?rev=1730572&view=rev
Log:
first working h2 request engine implementation that does serial processing of proxy requests
Modified:
httpd/httpd/trunk/modules/http2/h2_from_h1.c
httpd/httpd/trunk/modules/http2/h2_from_h1.h
httpd/httpd/trunk/modules/http2/h2_io.h
httpd/httpd/trunk/modules/http2/h2_mplx.c
httpd/httpd/trunk/modules/http2/h2_mplx.h
httpd/httpd/trunk/modules/http2/h2_proxy_session.c
httpd/httpd/trunk/modules/http2/h2_proxy_session.h
httpd/httpd/trunk/modules/http2/h2_task.c
httpd/httpd/trunk/modules/http2/h2_task.h
httpd/httpd/trunk/modules/http2/h2_task_input.c
httpd/httpd/trunk/modules/http2/h2_task_input.h
httpd/httpd/trunk/modules/http2/h2_task_output.c
httpd/httpd/trunk/modules/http2/h2_task_output.h
httpd/httpd/trunk/modules/http2/h2_worker.c
httpd/httpd/trunk/modules/http2/mod_http2.c
httpd/httpd/trunk/modules/http2/mod_http2.h
httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
Modified: httpd/httpd/trunk/modules/http2/h2_from_h1.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_from_h1.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_from_h1.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_from_h1.c Mon Feb 15 17:10:54 2016
@@ -49,12 +49,6 @@ h2_from_h1 *h2_from_h1_create(int stream
return from_h1;
}
-apr_status_t h2_from_h1_destroy(h2_from_h1 *from_h1)
-{
- from_h1->bb = NULL;
- return APR_SUCCESS;
-}
-
static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state)
{
if (from_h1->state != state) {
Modified: httpd/httpd/trunk/modules/http2/h2_from_h1.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_from_h1.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_from_h1.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_from_h1.h Mon Feb 15 17:10:54 2016
@@ -60,8 +60,6 @@ struct h2_from_h1 {
h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool);
-apr_status_t h2_from_h1_destroy(h2_from_h1 *response);
-
apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1,
ap_filter_t* f, apr_bucket_brigade* bb);
Modified: httpd/httpd/trunk/modules/http2/h2_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_io.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_io.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_io.h Mon Feb 15 17:10:54 2016
@@ -20,6 +20,7 @@ struct h2_response;
struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
+struct h2_task;
typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len);
@@ -62,6 +63,9 @@ struct h2_io {
apr_size_t input_consumed; /* how many bytes have been read */
int files_handles_owned;
+
+ struct h2_task *task; /* parked task */
+ request_rec *r; /* parked request */
};
/*******************************************************************************
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Mon Feb 15 17:10:54 2016
@@ -17,7 +17,7 @@
#include <stddef.h>
#include <stdlib.h>
-#include <apr_atomic.h>
+#include <apr_queue.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
@@ -27,9 +27,12 @@
#include <http_core.h>
#include <http_log.h>
+#include "mod_http2.h"
+
#include "h2_private.h"
#include "h2_config.h"
#include "h2_conn.h"
+#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_io.h"
#include "h2_io_set.h"
@@ -390,6 +393,9 @@ apr_status_t h2_mplx_stream_done(h2_mplx
* for processing, e.g. when we received all HEADERs. But when
* a stream is cancelled very early, it will not exist. */
if (io) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): marking stream as done.",
+ m->id, stream_id);
io_stream_done(m, io, rst_error);
}
@@ -415,25 +421,56 @@ static const h2_request *pop_request(h2_
return req;
}
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r)
+{
+ if (!m->engine_queue) {
+ apr_queue_create(&m->engine_queue, 200, m->pool);
+ }
+ return apr_queue_trypush(m->engine_queue, r);
+}
+
void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
{
h2_mplx *m = *pm;
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): request(%d) done", m->id, stream_id);
- if (io) {
- io->processing_done = 1;
- if (io->orphaned) {
- io_destroy(m, io, 0);
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
+ if (stream_id) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): request(%d) done", m->id, stream_id);
+ if (io) {
+ request_rec *r = io->r;
+
+ if (io->orphaned) {
+ io->processing_done = 1;
+ }
+ else if (r) {
+ /* A parked request which is being transferred from
+ * one worker thread to another. This request_done call
+ * was from the initial thread and now it is safe to
+ * schedule it for further processing. */
+ h2_task_thaw(io->task);
+ io->task = NULL;
+ io->r = NULL;
+ h2_mplx_engine_schedule(*pm, r);
+ }
+ else {
+ io->processing_done = 1;
+ }
+
+ if (io->processing_done) {
+ h2_io_out_close(io, NULL);
+ if (io->orphaned) {
+ io_destroy(m, io, 0);
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
+ }
+ else {
+ /* hang around until the stream deregisteres */
+ }
}
- }
- else {
- /* hang around until the stream deregisteres */
}
}
@@ -800,7 +837,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = out_write(m, io, f, bb, trailers, iowait);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%ld-%d): write with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
@@ -1049,3 +1086,107 @@ const h2_request *h2_mplx_pop_request(h2
return req;
}
+apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
+ const char *engine_type,
+ request_rec *r, h2_mplx_engine_init *einit)
+{
+ apr_status_t status;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
+ if (!io || io->orphaned) {
+ status = APR_ECONNABORTED;
+ }
+ else {
+ h2_req_engine *engine;
+
+ apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
+ status = APR_EOF;
+ engine = m->engine; /* just a single one for now */
+ if (task->ser_headers) {
+ /* Max compatibility, deny processing of this */
+ }
+ else if (!engine && einit) {
+ engine = apr_pcalloc(r->connection->pool, sizeof(*engine));
+ engine->id = 1;
+ engine->c = r->connection;
+ engine->pool = r->connection->pool;
+ engine->type = apr_pstrdup(engine->pool, engine_type);
+
+ status = einit(engine, r);
+ if (status == APR_SUCCESS) {
+ m->engine = engine;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "h2_mplx(%ld): init engine %d (%s)",
+ m->c->id, engine->id, engine->type);
+ }
+ }
+ else if (engine && !strcmp(engine->type, engine_type)) {
+ if (status == APR_SUCCESS) {
+ /* this task will be processed in another thread,
+ * freeze any I/O for the time being. */
+ h2_task_freeze(task, r);
+ io->task = task;
+ io->r = r;
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r,
+ "h2_mplx(%ld): push request %s",
+ m->c->id, r->the_request);
+ }
+ }
+
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task,
+ struct h2_req_engine *engine,
+ apr_time_t timeout, request_rec **pr)
+{
+ apr_status_t status;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ status = APR_ECONNABORTED;
+ if (m->engine == engine && m->engine_queue) {
+ void *elem;
+ status = apr_queue_trypop(m->engine_queue, &elem);
+ if (status == APR_SUCCESS) {
+ *pr = elem;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr,
+ "h2_mplx(%ld): request %s pulled by engine %d",
+ m->c->id, (*pr)->the_request, engine->id);
+ }
+ }
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn)
+{
+ int stream_id = task->stream_id;
+ h2_task_output_close(task->output);
+ h2_mplx_request_done(&m, stream_id, NULL);
+ apr_pool_destroy(r_conn->pool);
+}
+
+void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task,
+ struct h2_req_engine *engine)
+{
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if (enter_mutex(m, &acquired) == APR_SUCCESS) {
+ /* TODO: shutdown of engine->c */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): exit engine %d (%s)",
+ m->c->id, engine->id, engine->type);
+ m->engine = NULL;
+ leave_mutex(m, acquired);
+ }
+}
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Mon Feb 15 17:10:54 2016
@@ -47,7 +47,9 @@ struct apr_thread_cond_t;
struct h2_workers;
struct h2_stream_set;
struct h2_task_queue;
+struct h2_req_engine;
+#include <apr_queue.h>
#include "h2_io.h"
typedef struct h2_mplx h2_mplx;
@@ -87,6 +89,9 @@ struct h2_mplx {
h2_mplx_consumed_cb *input_consumed;
void *input_consumed_ctx;
+
+ struct h2_req_engine *engine;
+ apr_queue_t *engine_queue;
};
@@ -373,4 +378,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx
#define H2_MPLX_REMOVE(e) APR_RING_REMOVE((e), link)
+/*******************************************************************************
+ * h2_mplx h2_req_engine handling.
+ ******************************************************************************/
+
+typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine,
+ request_rec *r);
+
+apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task,
+ const char *engine_type,
+ request_rec *r, h2_mplx_engine_init *einit);
+
+apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task,
+ struct h2_req_engine *engine,
+ apr_time_t timeout, request_rec **pr);
+
+void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn);
+
+void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task,
+ struct h2_req_engine *engine);
+
#endif /* defined(__mod_h2__h2_mplx__) */
Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.c Mon Feb 15 17:10:54 2016
@@ -18,6 +18,7 @@
#include <httpd.h>
#include <mod_proxy.h>
+#include <mod_http2.h>
#include "h2.h"
#include "h2_request.h"
@@ -60,6 +61,45 @@ static apr_status_t proxy_session_shutdo
return APR_SUCCESS;
}
+static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
+ proxy_conn_rec *p_conn,
+ conn_rec *origin, apr_bucket_brigade *bb,
+ int flush)
+{
+ apr_status_t status;
+ apr_off_t transferred;
+
+ if (flush) {
+ apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, e);
+ }
+ apr_brigade_length(bb, 0, &transferred);
+ if (transferred != -1)
+ p_conn->worker->s->transferred += transferred;
+ status = ap_pass_brigade(origin->output_filters, bb);
+ /* Cleanup the brigade now to avoid buckets lifetime
+ * issues in case of error returned below. */
+ apr_brigade_cleanup(bb);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084)
+ "pass request body failed to %pI (%s)",
+ p_conn->addr, p_conn->hostname);
+ if (origin->aborted) {
+ const char *ssl_note;
+
+ if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv"))
+ != NULL) && (strcmp(ssl_note, "err") == 0)) {
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ return HTTP_GATEWAY_TIME_OUT;
+ }
+ else {
+ return HTTP_BAD_REQUEST;
+ }
+ }
+ return OK;
+}
+
static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
size_t length, int flags, void *user_data)
{
@@ -75,9 +115,9 @@ static ssize_t raw_send(nghttp2_session
session->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(session->output, b);
- status = ap_proxy_pass_brigade(session->c->bucket_alloc, session->r,
- session->p_conn, session->c,
- session->output, flush);
+ status = proxy_pass_brigade(session->c->bucket_alloc,
+ session->p_conn, session->c,
+ session->output, flush);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_sesssion(%ld): sending", session->c->id);
@@ -203,6 +243,16 @@ static apr_status_t h2_proxy_stream_add_
return APR_SUCCESS;
}
+static int log_header(void *ctx, const char *key, const char *value)
+{
+ h2_proxy_stream *stream = ctx;
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
+ "h2_proxy_stream(%ld-%d), header_out %s: %s",
+ stream->session->c->id, stream->id, key, value);
+ return 1;
+}
+
static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
{
h2_proxy_session *session = stream->session;
@@ -254,6 +304,13 @@ static void h2_proxy_stream_end_headers_
server_name, portstr)
);
}
+
+ if (APLOGrtrace2(stream->r)) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
+ "h2_proxy_stream(%ld-%d), header_out after merging",
+ stream->session->c->id, stream->id);
+ apr_table_do(log_header, stream, stream->r->headers_out, NULL);
+ }
}
static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
@@ -278,7 +335,8 @@ static int on_data_chunk_recv(nghttp2_se
stream->data_received = 1;
}
- b = apr_bucket_transient_create((const char*)data, len, session->c->bucket_alloc);
+ b = apr_bucket_transient_create((const char*)data, len,
+ stream->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
status = ap_pass_brigade(stream->r->output_filters, stream->output);
if (status != APR_SUCCESS) {
@@ -344,7 +402,6 @@ static ssize_t stream_data_read(nghttp2_
uint32_t *data_flags,
nghttp2_data_source *source, void *user_data)
{
- h2_proxy_session *session = user_data;
h2_proxy_stream *stream;
apr_status_t status = APR_SUCCESS;
@@ -358,9 +415,9 @@ static ssize_t stream_data_read(nghttp2_
status = ap_get_brigade(stream->r->input_filters, stream->input,
AP_MODE_READBYTES, APR_BLOCK_READ,
H2MIN(APR_BUCKET_BUFF_SIZE, length));
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_proxy_stream(%ld-%d): request body read",
- session->c->id, stream->id);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%d): request body read",
+ stream->id);
}
if (status == APR_SUCCESS) {
@@ -396,9 +453,9 @@ static ssize_t stream_data_read(nghttp2_
apr_bucket_delete(b);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- "h2_proxy_stream(%ld-%d): request body read %ld bytes, flags=%d",
- session->c->id, stream->id, (long)readlen, (int)*data_flags);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
+ "h2_proxy_stream(%d): request body read %ld bytes, flags=%d",
+ stream->id, (long)readlen, (int)*data_flags);
return readlen;
}
else if (APR_STATUS_IS_EAGAIN(status)) {
@@ -424,7 +481,6 @@ h2_proxy_session *h2_proxy_session_setup
session->c = p_conn->connection;
session->p_conn = p_conn;
session->conf = conf;
- session->r = r;
session->pool = p_conn->scpool;
session->window_bits_default = 30;
session->window_bits_connection = 30;
@@ -471,7 +527,7 @@ apr_status_t h2_proxy_session_open_strea
h2_proxy_stream *stream;
apr_uri_t puri;
const char *authority, *scheme, *path;
-
+
stream = apr_pcalloc(r->pool, sizeof(*stream));
stream->pool = r->pool;
@@ -625,9 +681,14 @@ apr_status_t h2_proxy_stream_process(h2_
rv = nghttp2_submit_request(session->ngh2, NULL,
hd->nv, hd->nvlen, pp, stream);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_session(%ld): submit request -> %d",
- session->c->id, rv);
+ if (APLOGcdebug(session->c)) {
+ const char *task_id = apr_table_get(stream->r->connection->notes,
+ H2_TASK_ID_NOTE);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+ "h2_session(%ld): submit %s%s -> %d (task %s)",
+ session->c->id, stream->req->authority, stream->req->path,
+ rv, task_id);
+ }
if (rv > 0) {
stream->id = rv;
stream->state = H2_STREAM_ST_OPEN;
Modified: httpd/httpd/trunk/modules/http2/h2_proxy_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_proxy_session.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_proxy_session.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_proxy_session.h Mon Feb 15 17:10:54 2016
@@ -24,7 +24,6 @@ typedef struct h2_proxy_session {
conn_rec *c;
proxy_conn_rec *p_conn;
proxy_server_conf *conf;
- request_rec *r;
apr_pool_t *pool;
nghttp2_session *ngh2; /* the nghttp2 session itself */
@@ -66,4 +65,6 @@ apr_status_t h2_proxy_session_open_strea
request_rec *r, h2_proxy_stream **pstream);
apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream);
+#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url"
+
#endif /* h2_proxy_session_h */
Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Mon Feb 15 17:10:54 2016
@@ -85,6 +85,28 @@ static apr_status_t h2_filter_read_respo
return h2_from_h1_read_response(task->output->from_h1, f, bb);
}
+static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
+ apr_bucket_brigade* bb)
+{
+ h2_task *task = f->ctx;
+ AP_DEBUG_ASSERT(task);
+
+ if (task->frozen) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+ "h2_response_freeze_filter, saving");
+ APR_BRIGADE_CONCAT(task->frozen_out, bb);
+ return APR_SUCCESS;
+ }
+
+ if (APR_BRIGADE_EMPTY(bb)) {
+ return APR_SUCCESS;
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
+ "h2_response_freeze_filter, passing");
+ return ap_pass_brigade(f->next, bb);
+}
+
/*******************************************************************************
* Register various hooks
*/
@@ -119,6 +141,8 @@ void h2_task_register_hooks(void)
NULL, AP_FTYPE_PROTOCOL);
ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter,
NULL, AP_FTYPE_PROTOCOL);
+ ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter,
+ NULL, AP_FTYPE_RESOURCE);
}
/* post config init */
@@ -168,6 +192,7 @@ h2_task *h2_task_create(long session_id,
task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id);
task->stream_id = req->id;
+ task->pool = pool;
task->mplx = mplx;
task->request = req;
task->input_eos = !req->body;
@@ -179,6 +204,8 @@ h2_task *h2_task_create(long session_id,
apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond,
apr_socket_t *socket)
{
+ apr_status_t status;
+
AP_DEBUG_ASSERT(task);
task->io = cond;
task->input = h2_task_input_create(task, c);
@@ -186,21 +213,27 @@ apr_status_t h2_task_do(h2_task *task, c
ap_process_connection(c, socket);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_task(%s): processing done", task->id);
-
- h2_task_input_destroy(task->input);
- h2_task_output_close(task->output);
- h2_task_output_destroy(task->output);
- task->io = NULL;
+ if (task->frozen) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): process_conn returned frozen task",
+ task->id);
+ /* cleanup delayed */
+ status = APR_EAGAIN;
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): processing done", task->id);
+ status = APR_SUCCESS;
+ }
- return APR_SUCCESS;
+ return status;
}
-static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c)
+static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
{
- request_rec *r;
+ const h2_request *req = task->request;
conn_state_t *cs = c->cs;
+ request_rec *r;
r = h2_request_create_rec(req, c);
if (r && (r->status == HTTP_OK)) {
@@ -210,10 +243,15 @@ static apr_status_t h2_task_process_requ
cs->state = CONN_STATE_HANDLER;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_task(%ld-%d): start process_request", c->id, req->id);
+ "h2_task(%s): start process_request", task->id);
ap_process_request(r);
+ if (task->frozen) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ "h2_task(%s): process_request frozen", task->id);
+ }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
- "h2_task(%ld-%d): process_request done", c->id, req->id);
+ "h2_task(%s): process_request done", task->id);
+
/* After the call to ap_process_request, the
* request pool will have been deleted. We set
* r=NULL here to ensure that any dereference
@@ -221,11 +259,10 @@ static apr_status_t h2_task_process_requ
* will result in a segfault immediately instead
* of nondeterministic failures later.
*/
- if (cs)
+ if (cs)
cs->state = CONN_STATE_WRITE_COMPLETION;
r = NULL;
}
- ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c);
c->sbh = NULL;
return APR_SUCCESS;
@@ -244,7 +281,7 @@ static int h2_task_process_conn(conn_rec
if (!ctx->task->ser_headers) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_h2, processing request directly");
- h2_task_process_request(ctx->task->request, c);
+ h2_task_process_request(ctx->task, c);
return DONE;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
@@ -252,3 +289,24 @@ static int h2_task_process_conn(conn_rec
}
return DECLINED;
}
+
+apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
+{
+ if (!task->frozen) {
+ conn_rec *c = task->output->c;
+
+ task->frozen = 1;
+ task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
+ ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_task_thaw(h2_task *task)
+{
+ if (task->frozen) {
+ task->frozen = 0;
+ }
+ return APR_SUCCESS;
+}
+
Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Mon Feb 15 17:10:54 2016
@@ -50,16 +50,20 @@ typedef struct h2_task h2_task;
struct h2_task {
const char *id;
int stream_id;
+ apr_pool_t *pool;
struct h2_mplx *mplx;
const struct h2_request *request;
unsigned int filters_set : 1;
unsigned int input_eos : 1;
unsigned int ser_headers : 1;
+ unsigned int frozen : 1;
struct h2_task_input *input;
struct h2_task_output *output;
struct apr_thread_cond_t *io; /* used to wait for events on */
+
+ apr_bucket_brigade *frozen_out;
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
@@ -77,4 +81,7 @@ apr_status_t h2_task_init(apr_pool_t *po
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
+apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
+apr_status_t h2_task_thaw(h2_task *task);
+
#endif /* defined(__mod_h2__h2_task__) */
Modified: httpd/httpd/trunk/modules/http2/h2_task_input.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_input.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_input.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_input.c Mon Feb 15 17:10:54 2016
@@ -75,11 +75,6 @@ h2_task_input *h2_task_input_create(h2_t
return input;
}
-void h2_task_input_destroy(h2_task_input *input)
-{
- input->bb = NULL;
-}
-
apr_status_t h2_task_input_read(h2_task_input *input,
ap_filter_t* f,
apr_bucket_brigade* bb,
Modified: httpd/httpd/trunk/modules/http2/h2_task_input.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_input.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_input.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_input.h Mon Feb 15 17:10:54 2016
@@ -34,8 +34,6 @@ struct h2_task_input {
h2_task_input *h2_task_input_create(struct h2_task *task, conn_rec *c);
-void h2_task_input_destroy(h2_task_input *input);
-
apr_status_t h2_task_input_read(h2_task_input *input,
ap_filter_t* filter,
apr_bucket_brigade* brigade,
Modified: httpd/httpd/trunk/modules/http2/h2_task_output.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.c Mon Feb 15 17:10:54 2016
@@ -49,14 +49,6 @@ h2_task_output *h2_task_output_create(h2
return output;
}
-void h2_task_output_destroy(h2_task_output *output)
-{
- if (output->from_h1) {
- h2_from_h1_destroy(output->from_h1);
- output->from_h1 = NULL;
- }
-}
-
static apr_table_t *get_trailers(h2_task_output *output)
{
if (!output->trailers_passed) {
@@ -75,7 +67,7 @@ static apr_table_t *get_trailers(h2_task
}
static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
- apr_bucket_brigade *bb)
+ apr_bucket_brigade *bb, const char *caller)
{
if (output->state == H2_TASK_OUT_INIT) {
h2_response *response;
@@ -86,9 +78,10 @@ static apr_status_t open_if_needed(h2_ta
/* This happens currently when ap_die(status, r) is invoked
* by a read request filter. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
- "h2_task_output(%s): write without response "
+ "h2_task_output(%s): write without response by %s "
"for %s %s %s",
- output->task->id, output->task->request->method,
+ output->task->id, caller,
+ output->task->request->method,
output->task->request->authority,
output->task->request->path);
f->c->aborted = 1;
@@ -108,6 +101,11 @@ static apr_status_t open_if_needed(h2_ta
h2_task_logio_add_bytes_out(f->c, bytes_written);
}
get_trailers(output);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
+ "h2_task_output(%s): open as needed %s %s %s",
+ output->task->id, output->task->request->method,
+ output->task->request->authority,
+ output->task->request->path);
return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
response, f, bb, output->task->io);
}
@@ -116,19 +114,19 @@ static apr_status_t open_if_needed(h2_ta
void h2_task_output_close(h2_task_output *output)
{
- open_if_needed(output, NULL, NULL);
+ open_if_needed(output, NULL, NULL, "close");
if (output->state != H2_TASK_OUT_DONE) {
+ if (output->task->frozen_out
+ && !APR_BRIGADE_EMPTY(output->task->frozen_out)) {
+ h2_mplx_out_write(output->task->mplx, output->task->stream_id,
+ NULL, output->task->frozen_out, NULL, NULL);
+ }
h2_mplx_out_close(output->task->mplx, output->task->stream_id,
get_trailers(output));
output->state = H2_TASK_OUT_DONE;
}
}
-int h2_task_output_has_started(h2_task_output *output)
-{
- return output->state >= H2_TASK_OUT_STARTED;
-}
-
/* Bring the data from the brigade (which represents the result of the
* request_rec out filter chain) into the h2_mplx for further sending
* on the master connection.
@@ -144,7 +142,14 @@ apr_status_t h2_task_output_write(h2_tas
return APR_SUCCESS;
}
- status = open_if_needed(output, f, bb);
+ if (output->task->frozen) {
+ h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
+ "frozen task output write", bb);
+ APR_BRIGADE_CONCAT(output->task->frozen_out, bb);
+ return APR_SUCCESS;
+ }
+
+ status = open_if_needed(output, f, bb, "write");
if (status != APR_EOF) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_output(%s): opened and passed brigade",
Modified: httpd/httpd/trunk/modules/http2/h2_task_output.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task_output.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task_output.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task_output.h Mon Feb 15 17:10:54 2016
@@ -44,14 +44,13 @@ struct h2_task_output {
h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);
-void h2_task_output_destroy(h2_task_output *output);
-
apr_status_t h2_task_output_write(h2_task_output *output,
ap_filter_t* filter,
apr_bucket_brigade* brigade);
void h2_task_output_close(h2_task_output *output);
-int h2_task_output_has_started(h2_task_output *output);
+apr_status_t h2_task_output_freeze(h2_task_output *output);
+apr_status_t h2_task_output_thaw(h2_task_output *output);
#endif /* defined(__mod_h2__h2_task_output__) */
Modified: httpd/httpd/trunk/modules/http2/h2_worker.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_worker.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_worker.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_worker.c Mon Feb 15 17:10:54 2016
@@ -35,24 +35,9 @@ static void* APR_THREAD_FUNC execute(apr
{
h2_worker *worker = (h2_worker *)wctx;
apr_status_t status;
- apr_allocator_t *task_allocator = NULL;
- apr_pool_t *task_pool;
+ apr_pool_t *task_pool = NULL;
(void)thread;
-
- /* We create a root pool with its own allocator to be used for
- * processing a request. This is the only way to have the processing
- * independant of the worker pool as the h2_mplx pool as well as
- * not sensitive to which thread it is in.
- * In that sense, memory allocation and lifetime is similar to a master
- * connection.
- * The main goal in this is that slave connections and requests will
- * - one day - be suspended and resumed in different threads.
- */
- apr_allocator_create(&task_allocator);
- apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator);
- apr_allocator_owner_set(task_allocator, task_pool);
-
/* Other code might want to see a socket for this connection this
* worker processes. Allocate one without further function...
*/
@@ -78,6 +63,22 @@ static void* APR_THREAD_FUNC execute(apr
conn_rec *c, *master = m->c;
int stream_id = req->id;
+ if (!task_pool) {
+ apr_allocator_t *task_allocator = NULL;
+ /* We create a root pool with its own allocator to be used for
+ * processing a request. This is the only way to have the processing
+ * independant of the worker pool as the h2_mplx pool as well as
+ * not sensitive to which thread it is in.
+ * In that sense, memory allocation and lifetime is similar to a master
+ * connection.
+ * The main goal in this is that slave connections and requests will
+ * - one day - be suspended and resumed in different threads.
+ */
+ apr_allocator_create(&task_allocator);
+ apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator);
+ apr_allocator_owner_set(task_allocator, task_pool);
+ }
+
c = h2_slave_create(master, task_pool,
worker->thread, worker->socket);
if (!c) {
@@ -92,8 +93,13 @@ static void* APR_THREAD_FUNC execute(apr
task = h2_task_create(m->id, req, task_pool, m);
h2_ctx_create_for(c, task);
h2_task_do(task, c, worker->io, worker->socket);
- task = NULL;
+ if (task->frozen) {
+ /* this task was handed over to someone else for
+ * processing */
+ task_pool = NULL;
+ }
+ task = NULL;
apr_thread_cond_signal(worker->io);
}
@@ -103,7 +109,9 @@ static void* APR_THREAD_FUNC execute(apr
* long as it has requests to handle. Might no be fair to
* other mplx's. Perhaps leave after n requests? */
req = NULL;
- apr_pool_clear(task_pool);
+ if (task_pool) {
+ apr_pool_clear(task_pool);
+ }
h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req);
}
}
Modified: httpd/httpd/trunk/modules/http2/mod_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.c Mon Feb 15 17:10:54 2016
@@ -35,6 +35,7 @@
#include "h2_config.h"
#include "h2_ctx.h"
#include "h2_h2.h"
+#include "h2_mplx.h"
#include "h2_push.h"
#include "h2_request.h"
#include "h2_switch.h"
@@ -121,6 +122,57 @@ static char *http2_var_lookup(apr_pool_t
conn_rec *, request_rec *, char *name);
static int http2_is_h2(conn_rec *);
+static apr_status_t http2_req_engine_push(const char *engine_type,
+ request_rec *r,
+ h2_req_engine_init *einit)
+{
+ h2_ctx *ctx = h2_ctx_rget(r);
+ if (ctx) {
+ h2_task *task = h2_ctx_get_task(ctx);
+ if (task) {
+ return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit);
+ }
+ }
+ return APR_EINVAL;
+}
+
+static apr_status_t http2_req_engine_pull(h2_req_engine *engine,
+ apr_time_t timeout, request_rec **pr)
+{
+ h2_ctx *ctx = h2_ctx_get(engine->c, 0);
+ if (ctx) {
+ h2_task *task = h2_ctx_get_task(ctx);
+ if (task) {
+ return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr);
+ }
+ }
+ return APR_ECONNABORTED;
+}
+
+static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
+{
+ h2_ctx *ctx = h2_ctx_get(r_conn, 0);
+ if (ctx) {
+ h2_task *task = h2_ctx_get_task(ctx);
+ if (task) {
+ h2_mplx_engine_done(task->mplx, task, r_conn);
+ /* task is destroyed */
+ }
+ }
+}
+
+static void http2_req_engine_exit(h2_req_engine *engine)
+{
+ h2_ctx *ctx = h2_ctx_get(engine->c, 0);
+ if (ctx) {
+ h2_task *task = h2_ctx_get_task(ctx);
+ if (task) {
+ h2_mplx_engine_exit(task->mplx, task, engine);
+ }
+ }
+}
+
+
/* Runs once per created child process. Perform any process
* related initionalization here.
*/
@@ -143,6 +195,10 @@ static void h2_hooks(apr_pool_t *pool)
APR_REGISTER_OPTIONAL_FN(http2_is_h2);
APR_REGISTER_OPTIONAL_FN(http2_var_lookup);
+ APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
+ APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
+ APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
+ APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit);
ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");
Modified: httpd/httpd/trunk/modules/http2/mod_http2.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_http2.h?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_http2.h (original)
+++ httpd/httpd/trunk/modules/http2/mod_http2.h Mon Feb 15 17:10:54 2016
@@ -18,13 +18,97 @@
/** The http2_var_lookup() optional function retrieves HTTP2 environment
* variables. */
-APR_DECLARE_OPTIONAL_FN(char *, http2_var_lookup,
- (apr_pool_t *, server_rec *,
- conn_rec *, request_rec *,
- char *));
+APR_DECLARE_OPTIONAL_FN(char *,
+ http2_var_lookup, (apr_pool_t *, server_rec *,
+ conn_rec *, request_rec *, char *));
/** An optional function which returns non-zero if the given connection
* or its master connection is using HTTP/2. */
-APR_DECLARE_OPTIONAL_FN(int, http2_is_h2, (conn_rec *));
+APR_DECLARE_OPTIONAL_FN(int,
+ http2_is_h2, (conn_rec *));
+
+
+/*******************************************************************************
+ * HTTP/2 slave engines
+ ******************************************************************************/
+
+typedef struct h2_req_engine h2_req_engine;
+
+/**
+ * Initialize a h2_req_engine. The structure will be passed in but
+ * only the name and master are set. The function should initialize
+ * all fields.
+ * @param engine the allocated, partially filled structure
+ * @param r the first request to process, or NULL
+ */
+typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
+
+/**
+ * The public structure of a h2_req_engine. It gets allocated by the http2
+ * infrastructure, assigned id, type, pool and connection and passed to the
+ * h2_req_engine_init() callback to complete initialization.
+ * This happens whenever a new request gets "push"ed for an engine type and
+ * no instance, or no free instance, for the type is available.
+ */
+struct h2_req_engine {
+ int id; /* identifier, unique for a master connection */
+ const char *type; /* name of the engine type */
+ apr_pool_t *pool; /* pool for engine specific allocations */
+ conn_rec *c; /* connection this engine is assigned to */
+ apr_size_t r_capacity; /* request capacity engine is willing to handle,
+ may change between invocations. If the engine
+ sets this to 0, it signals that it no longer
+ wants more requests. New requests, already
+ scheduled for this engine might still arrive for
+ a time. */
+ apr_size_t r_count; /* number of request currently assigned, it is the
+ responsibility of the engine to update this. */
+ void *data; /* engine specific data */
+};
+
+/**
+ * Push a request to an engine with the specified name for further processing.
+ * If no such engine is available, einit is not NULL, einit is called
+ * with a new engine record and the caller is responsible for running the
+ * new engine instance.
+ * @param engine_type the type of the engine to add the request to
+ * @param r the request to push to an engine for processing
+ * @param einit an optional initialization callback for a new engine
+ * of the requested type, should no instance be available.
+ * By passing a non-NULL callback, the caller is willing
+ * to init and run a new engine itself.
+ * @return APR_SUCCESS iff slave was successfully added to an engine
+ */
+APR_DECLARE_OPTIONAL_FN(apr_status_t,
+ http2_req_engine_push, (const char *engine_type,
+ request_rec *r,
+ h2_req_engine_init *einit));
+
+/**
+ * Get a new request for processing in this engine.
+ * @param engine the engine which is done processing the slave
+ * @param timeout wait a maximum amount of time for a new slave, 0 will not wait
+ * @param pslave the slave connection that needs processing or NULL
+ * @return APR_SUCCESS if new request was assigned
+ * APR_EAGAIN/APR_TIMEUP if no new request is available
+ * APR_ECONNABORTED if the engine needs to shut down
+ */
+APR_DECLARE_OPTIONAL_FN(apr_status_t,
+ http2_req_engine_pull, (h2_req_engine *engine,
+ apr_time_t timeout,
+ request_rec **pr));
+APR_DECLARE_OPTIONAL_FN(void,
+ http2_req_engine_done, (h2_req_engine *engine,
+ conn_rec *rconn));
+/**
+ * The given request engine is done processing and needs to be excluded
+ * from further handling.
+ * @param engine the engine to exit
+ */
+APR_DECLARE_OPTIONAL_FN(void,
+ http2_req_engine_exit, (h2_req_engine *engine));
+
+
+#define H2_TASK_ID_NOTE "http2-task-id"
#endif
Modified: httpd/httpd/trunk/modules/http2/mod_proxy_http2.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/mod_proxy_http2.c?rev=1730572&r1=1730571&r2=1730572&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/mod_proxy_http2.c (original)
+++ httpd/httpd/trunk/modules/http2/mod_proxy_http2.c Mon Feb 15 17:10:54 2016
@@ -17,6 +17,7 @@
#include <httpd.h>
#include <mod_proxy.h>
+#include <mod_http2.h>
#include "mod_proxy_http2.h"
@@ -37,6 +38,30 @@ AP_DECLARE_MODULE(proxy_http2) = {
register_hook /* register hooks */
};
+/* Optional functions from mod_http2 */
+static int (*is_h2)(conn_rec *c);
+static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
+ h2_req_engine_init *einit);
+static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
+ apr_time_t timeout, request_rec **pr);
+static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
+static void (*req_engine_exit)(h2_req_engine *engine);
+
+typedef struct h2_proxy_ctx {
+ conn_rec *owner;
+ server_rec *server;
+ const char *proxy_func;
+ char server_portstr[32];
+ proxy_conn_rec *p_conn;
+ proxy_worker *worker;
+ proxy_server_conf *conf;
+
+ h2_req_engine *engine;
+ unsigned standalone : 1;
+ unsigned is_ssl : 1;
+ unsigned flushall : 1;
+} h2_proxy_ctx;
+
static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
apr_pool_t *ptemp, server_rec *s)
{
@@ -58,6 +83,21 @@ static int h2_proxy_post_config(apr_pool
"mod_proxy_http2 (v%s, nghttp2 %s), initializing...",
MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown");
+ is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2);
+ req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
+ req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
+ req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
+ req_engine_exit = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_exit);
+
+ /* we need all of them */
+ if (!req_engine_push || !req_engine_pull
+ || !req_engine_done || !req_engine_exit) {
+ req_engine_push = NULL;
+ req_engine_pull = NULL;
+ req_engine_done = NULL;
+ req_engine_exit = NULL;
+ }
+
return status;
}
@@ -147,59 +187,111 @@ static int proxy_http2_canon(request_rec
return OK;
}
-static apr_status_t proxy_http2_cleanup(const char *scheme, request_rec *r,
- proxy_conn_rec *backend)
+static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
{
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "cleanup, releasing connection");
- ap_proxy_release_connection(scheme, backend, r->server);
- return OK;
+ h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config,
+ &proxy_http2_module);
+ if (ctx) {
+ ctx->engine = engine;
+ return APR_SUCCESS;
+ }
+ return APR_ENOTIMPL;
}
-static
-int proxy_http2_process_stream(apr_pool_t *p, const char *url, request_rec *r,
- proxy_conn_rec **pp_conn, proxy_worker *worker,
- proxy_server_conf *conf, char *server_portstr,
- int flushall)
-{
- int rv = APR_ENOTIMPL;
- proxy_conn_rec *p_conn = *pp_conn;
+static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
+ int status = OK;
h2_proxy_session *session;
h2_proxy_stream *stream;
- session = h2_proxy_session_setup(r, *pp_conn, conf);
+ /* Step Two: Make the Connection (or check that an already existing
+ * socket is still usable). On success, we have a socket connected to
+ * backend->hostname. */
+ if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker,
+ ctx->server)) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO()
+ "H2: failed to make connection to backend: %s",
+ ctx->p_conn->hostname);
+ return HTTP_SERVICE_UNAVAILABLE;
+ }
+
+ /* Step Three: Create conn_rec for the socket we have open now. */
+ if (!ctx->p_conn->connection) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO()
+ "setup new connection: is_ssl=%d %s %s %s",
+ ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
+ r->hostname, ctx->p_conn->hostname);
+ if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
+ ctx->owner,
+ ctx->server)) != OK) {
+ return status;
+ }
+
+ /*
+ * On SSL connections set a note on the connection what CN is
+ * requested, such that mod_ssl can check if it is requested to do
+ * so.
+ */
+ if (ctx->p_conn->ssl_hostname) {
+ apr_table_setn(ctx->p_conn->connection->notes,
+ "proxy-request-hostname", ctx->p_conn->ssl_hostname);
+ }
+
+ if (ctx->is_ssl) {
+ apr_table_setn(ctx->p_conn->connection->notes,
+ "proxy-request-alpn-protos", "h2");
+ }
+ }
+
+ /* Step Four: Send the Request in a new HTTP/2 stream and
+ * loop until we got the response or encounter errors.
+ */
+ status = APR_ENOTIMPL;
+ session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf);
if (!session) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, p_conn->connection,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection,
"session unavailable");
return HTTP_SERVICE_UNAVAILABLE;
}
- /* TODO
- * - enter http2 client processing loop:
- * - send any input in datasource callback from r->input_filters
- * - await response HEADERs
- * - send any DATA to r->output_filters
- * - on stream close, check for missing response
- * - on certain errors, mark connection for close
- */
- rv = h2_proxy_session_open_stream(session, url, r, &stream);
- if (rv == OK) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- "process stream(%d): %s %s%s, original: %s",
- stream->id, stream->req->method,
- stream->req->authority, stream->req->path,
- r->the_request);
- rv = h2_proxy_stream_process(stream);
- }
-
- if (rv != OK) {
- conn_rec *c = r->connection;
- ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO()
- "pass request body failed to %pI (%s) from %s (%s)",
- p_conn->addr, p_conn->hostname ? p_conn->hostname: "",
- c->client_ip, c->remote_host ? c->remote_host: "");
+ while (r) {
+ conn_rec *r_conn = r->connection;
+ const char *url;
+
+ url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
+ status = h2_proxy_session_open_stream(session, url, r, &stream);
+ if (status == OK) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn,
+ "process stream(%d): %s %s%s, original: %s",
+ stream->id, stream->req->method,
+ stream->req->authority, stream->req->path,
+ r->the_request);
+ status = h2_proxy_stream_process(stream);
+ }
+ r = NULL;
+
+ if (status != OK) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO()
+ "pass request body failed to %pI (%s) from %s (%s)",
+ ctx->p_conn->addr, ctx->p_conn->hostname ?
+ ctx->p_conn->hostname: "", session->c->client_ip,
+ session->c->remote_host ? session->c->remote_host: "");
+ }
+
+ if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) {
+ req_engine_done(ctx->engine, r_conn);
+ }
+ r_conn = NULL;
+
+ if (!ctx->standalone && req_engine_pull) {
+ status = req_engine_pull(ctx->engine, ctx->server->timeout, &r);
+ if (status != APR_SUCCESS) {
+ status = APR_SUCCESS;
+ break;
+ }
+ }
}
-
- return rv;
+
+ return status;
}
static int proxy_http2_handler(request_rec *r,
@@ -209,18 +301,17 @@ static int proxy_http2_handler(request_r
const char *proxyname,
apr_port_t proxyport)
{
- const char *proxy_function;
- proxy_conn_rec *backend;
+ const char *proxy_func;
char *locurl = url, *u;
apr_size_t slen;
int is_ssl = 0;
- int flushall = 0;
- int status;
- char server_portstr[32];
+ apr_status_t status;
conn_rec *c = r->connection;
- apr_pool_t *p = r->pool;
+ server_rec *s = r->server;
+ apr_pool_t *p = c->pool;
apr_uri_t *uri = apr_palloc(p, sizeof(*uri));
- conn_rec *backconn;
+ h2_proxy_ctx *ctx;
+ const char *engine_type, *hostname;
/* find the scheme */
if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
@@ -233,22 +324,30 @@ static int proxy_http2_handler(request_r
slen = (u - url);
switch(slen) {
case 2:
- proxy_function = "H2";
+ proxy_func = "H2";
is_ssl = 1;
break;
case 3:
if (url[2] != 'c' && url[2] != 'C') {
return DECLINED;
}
- proxy_function = "H2C";
+ proxy_func = "H2C";
break;
default:
return DECLINED;
}
- if (apr_table_get(r->subprocess_env, "proxy-flushall")) {
- flushall = 1;
- }
+ ctx = apr_pcalloc(p, sizeof(*ctx));
+ ctx->owner = c;
+ ctx->server = s;
+ ctx->proxy_func = proxy_func;
+ ctx->is_ssl = is_ssl;
+ ctx->worker = worker;
+ ctx->conf = conf;
+ ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
+
+ ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
+ apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
/* scheme says, this is for us. */
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "H2: serving URL %s", url);
@@ -256,89 +355,82 @@ static int proxy_http2_handler(request_r
/* Get a proxy_conn_rec from the worker, might be a new one, might
* be one still open from another request, or it might fail if the
* worker is stopped or in error. */
- if ((status = ap_proxy_acquire_connection(proxy_function, &backend,
- worker, r->server)) != OK) {
+ if ((status = ap_proxy_acquire_connection(ctx->proxy_func, &ctx->p_conn,
+ ctx->worker, s)) != OK) {
goto cleanup;
}
- backend->is_ssl = is_ssl;
- if (is_ssl) {
+ ctx->p_conn->is_ssl = ctx->is_ssl;
+ if (ctx->is_ssl) {
/* If there is still some data on an existing ssl connection, now
* would be a good timne to get rid of it. */
- ap_proxy_ssl_connection_cleanup(backend, r);
+ ap_proxy_ssl_connection_cleanup(ctx->p_conn, r);
}
/* Step One: Determine the URL to connect to (might be a proxy),
* initialize the backend accordingly and determine the server
* port string we can expect in responses. */
- if ((status = ap_proxy_determine_connection(p, r, conf, worker, backend,
+ if ((status = ap_proxy_determine_connection(p, r, conf, worker, ctx->p_conn,
uri, &locurl, proxyname,
- proxyport, server_portstr,
- sizeof(server_portstr))) != OK) {
+ proxyport, ctx->server_portstr,
+ sizeof(ctx->server_portstr))) != OK) {
goto cleanup;
}
- /* Step Two: Make the Connection (or check that an already existing
- * socket is still usable). On success, we have a socket connected to
- * backend->hostname. */
- if (ap_proxy_connect_backend(proxy_function, backend, worker, r->server)) {
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO()
- "H2: failed to make connection to backend: %s",
- backend->hostname);
- status = HTTP_SERVICE_UNAVAILABLE;
- goto cleanup;
- }
+ hostname = (ctx->p_conn->ssl_hostname?
+ ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
+ engine_type = apr_psprintf(p, "proxy_http2 %s%s", hostname, ctx->server_portstr);
- /* Step Three: Create conn_rec for the socket we have open now. */
- backconn = backend->connection;
- if (!backconn) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO()
- "setup new connection: is_ssl=%d %s %s %s",
- backend->is_ssl,
- backend->ssl_hostname, r->hostname, backend->hostname);
- if ((status = ap_proxy_connection_create(proxy_function, backend,
- c, r->server)) != OK) {
- goto cleanup;
- }
- backconn = backend->connection;
-
- /*
- * On SSL connections set a note on the connection what CN is
- * requested, such that mod_ssl can check if it is requested to do
- * so.
+ if (c->master && req_engine_push && is_h2 && is_h2(ctx->owner)) {
+ /* If we are have req_engine capabilities, push the handling of this
+ * request (e.g. slave connection) to a proxy_http2 engine which uses
+ * the same backend. We may be called to create an engine ourself.
*/
- if (backend->ssl_hostname) {
- apr_table_setn(backend->connection->notes,
- "proxy-request-hostname", backend->ssl_hostname);
- }
-
- if (backend->is_ssl) {
- apr_table_setn(backend->connection->notes,
- "proxy-request-alpn-protos", "h2");
+ status = req_engine_push(engine_type, r, proxy_engine_init);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+ "H2: pushing request %s to engine type %s",
+ url, engine_type);
+ if (status == APR_SUCCESS && ctx->engine == NULL) {
+ /* Another engine instance has taken over processing of this
+ * request. */
+ goto cleanup;
}
}
-
- /* Step Four: Send the Request in a new HTTP/2 stream and
- * loop until we got the response or encounter errors.
- */
- if ((status = proxy_http2_process_stream(p, url, r, &backend, worker,
- conf, server_portstr,
- flushall)) != OK) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO()
- "H2: failed to process request: %s",
- r->the_request);
+
+ if (!ctx->engine) {
+ /* No engine was available or has been initialized, handle this
+ * request just by ourself. */
+ h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
+ engine->id = 0;
+ engine->type = engine_type;
+ engine->pool = p;
+ engine->c = c;
+ ctx->engine = engine;
+ ctx->standalone = 1;
}
+
+ status = proxy_engine_run(ctx, r);
- /* clean up before return */
cleanup:
- if (backend) {
- if (status != OK) {
- backend->close = 1;
+ if (ctx->engine && !ctx->standalone && req_engine_exit) {
+ req_engine_exit(ctx->engine);
+ }
+ ctx->engine = NULL;
+
+ if (ctx) {
+ if (ctx->p_conn) {
+ if (status != OK) {
+ ctx->p_conn->close = 1;
+ }
+ proxy_run_detach_backend(r, ctx->p_conn);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "cleanup, releasing connection");
+ ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
}
- proxy_run_detach_backend(r, backend);
- proxy_http2_cleanup(proxy_function, r, backend);
+ ctx->worker = NULL;
+ ctx->conf = NULL;
+ ctx->p_conn = NULL;
}
- ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, "leaving handler");
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
return status;
}