You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2017/03/31 19:41:01 UTC
svn commit: r1789739 [3/4] - in /httpd/httpd/branches/2.4.x: ./ docs/manual/
docs/manual/mod/ modules/http2/
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.c?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.c Fri Mar 31 19:41:01 2017
@@ -56,6 +56,7 @@ static void transit(h2_session *session,
static void on_stream_state_enter(void *ctx, h2_stream *stream);
static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
+static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
@@ -71,26 +72,20 @@ static int h2_session_status_from_apr_st
return NGHTTP2_ERR_PROTO;
}
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
+static h2_stream *get_stream(h2_session *session, int stream_id)
{
- h2_session *session = (h2_session*)ctx;
- while (bytes_read > 0) {
- int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
- nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): consumed %d bytes",
- session->id, stream_id, len);
- bytes_read -= len;
- }
+ return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}
-static apr_status_t h2_session_receive(void *ctx,
- const char *data, apr_size_t len,
- apr_size_t *readlen);
-
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
+void h2_session_event(h2_session *session, h2_session_event_t ev,
+ int err, const char *msg)
+{
+ dispatch_event(session, ev, err, msg);
+}
+
static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
{
int unprocessed = (!h2_stream_was_closed(stream)
@@ -227,11 +222,6 @@ static int on_invalid_frame_recv_cb(nght
return 0;
}
-static h2_stream *get_stream(h2_session *session, int stream_id)
-{
- return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
-}
-
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
@@ -703,7 +693,6 @@ static apr_status_t session_cleanup(h2_s
}
transit(session, trigger, H2_SESSION_ST_CLEANUP);
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
@@ -803,23 +792,33 @@ static apr_status_t h2_session_create_in
return status;
}
+ session->in_pending = h2_iq_create(session->pool, session->max_stream_count);
+ if (session->in_pending == NULL) {
+ apr_pool_destroy(pool);
+ return APR_ENOMEM;
+ }
+
+ session->in_process = h2_iq_create(session->pool, session->max_stream_count);
+ if (session->in_process == NULL) {
+ apr_pool_destroy(pool);
+ return APR_ENOMEM;
+ }
+
session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
if (session->monitor == NULL) {
apr_pool_destroy(pool);
- return status;
+ return APR_ENOMEM;
}
session->monitor->ctx = session;
session->monitor->on_state_enter = on_stream_state_enter;
session->monitor->on_state_event = on_stream_state_event;
+ session->monitor->on_event = on_stream_event;
session->mplx = h2_mplx_create(c, session->pool, session->config,
workers);
- h2_mplx_set_consumed_cb(session->mplx, update_window, session);
-
- /* Install the connection input filter that feeds the session */
- session->cin = h2_filter_cin_create(session->pool,
- h2_session_receive, session);
+ /* connection input filter that feeds the session */
+ session->cin = h2_filter_cin_create(session);
ap_add_input_filter("H2_IN", session->cin, r, c);
h2_conn_io_init(&session->io, c, session->config);
@@ -871,8 +870,8 @@ static apr_status_t h2_session_create_in
"push_diary(type=%d,N=%d)"),
(int)session->max_stream_count,
(int)session->max_stream_mem,
- session->mplx->workers_limit,
- session->mplx->workers_max,
+ session->mplx->limit_active,
+ session->mplx->max_active,
session->push_diary->dtype,
(int)session->push_diary->N);
}
@@ -1431,7 +1430,8 @@ send_headers:
if (!stream->has_response) {
/* but no response */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
- H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM"));
+ H2_STRM_LOG(APLOGNO(03466), stream,
+ "no response, RST_STREAM"));
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
return APR_SUCCESS;
}
@@ -1444,32 +1444,32 @@ send_headers:
return status;
}
-static apr_status_t h2_session_receive(void *ctx, const char *data,
- apr_size_t len, apr_size_t *readlen)
+static void h2_session_in_flush(h2_session *session)
{
- h2_session *session = ctx;
- ssize_t n;
+ int id;
- if (len > 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"),
- (long)len);
- n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
- if (n < 0) {
- if (nghttp2_is_fatal((int)n)) {
- dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n));
- return APR_EGENERAL;
+ while ((id = h2_iq_shift(session->in_process)) > 0) {
+ h2_stream *stream = get_stream(session, id);
+ if (stream) {
+ ap_assert(!stream->scheduled);
+ if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
+ h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
+ }
+ else {
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
}
}
- else {
- *readlen = n;
- session->io.bytes_read += n;
+ }
+
+ while ((id = h2_iq_shift(session->in_pending)) > 0) {
+ h2_stream *stream = get_stream(session, id);
+ if (stream) {
+ h2_stream_flush_input(stream);
}
}
- return APR_SUCCESS;
}
-static apr_status_t h2_session_read(h2_session *session, int block)
+static apr_status_t session_read(h2_session *session, apr_size_t readlen, int block)
{
apr_status_t status, rstatus = APR_EAGAIN;
conn_rec *c = session->c;
@@ -1481,7 +1481,7 @@ static apr_status_t h2_session_read(h2_s
status = ap_get_brigade(c->input_filters,
session->bbtmp, AP_MODE_READBYTES,
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
- APR_BUCKET_BUFF_SIZE);
+ H2MAX(APR_BUCKET_BUFF_SIZE, readlen));
/* get rid of any possible data we do not expect to get */
apr_brigade_cleanup(session->bbtmp);
@@ -1523,16 +1523,25 @@ static apr_status_t h2_session_read(h2_s
* status. */
return rstatus;
}
- if ((session->io.bytes_read - read_start) > (64*1024)) {
+ if ((session->io.bytes_read - read_start) > readlen) {
/* read enough in one go, give write a chance */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
- H2_SSSN_MSG(session, "read 64k, returning"));
+ H2_SSSN_MSG(session, "read enough, returning"));
break;
}
}
return rstatus;
}
+static apr_status_t h2_session_read(h2_session *session, int block)
+{
+ apr_status_t status = session_read(session, session->max_stream_mem
+ * H2MAX(2, session->open_streams),
+ block);
+ h2_session_in_flush(session);
+ return status;
+}
+
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
@@ -1769,24 +1778,17 @@ static void h2_session_ev_pre_close(h2_s
static void ev_stream_open(h2_session *session, h2_stream *stream)
{
+ h2_iq_append(session->in_process, stream->id);
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 1) {
- /* enter tiomeout, since we have a stream again */
+ /* enter timeout, since we have a stream again */
session->idle_until = (session->s->timeout + apr_time_now());
}
break;
default:
break;
}
-
- ap_assert(!stream->scheduled);
- if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
- h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
- }
- else {
- h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
- }
}
static void ev_stream_closed(h2_session *session, h2_stream *stream)
@@ -1862,6 +1864,20 @@ static void on_stream_state_enter(void *
break;
}
}
+
+static void on_stream_event(void *ctx, h2_stream *stream,
+ h2_stream_event_t ev)
+{
+ h2_session *session = ctx;
+ switch (ev) {
+ case H2_SEV_IN_DATA_PENDING:
+ h2_iq_append(session->in_pending, stream->id);
+ break;
+ default:
+ /* NOP */
+ break;
+ }
+}
static void on_stream_state_event(void *ctx, h2_stream *stream,
h2_stream_event_t ev)
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_session.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_session.h Fri Mar 31 19:41:01 2017
@@ -125,6 +125,10 @@ typedef struct h2_session {
char status[64]; /* status message for scoreboard */
int last_status_code; /* the one already reported */
const char *last_status_msg; /* the one already reported */
+
+ struct h2_iqueue *in_pending; /* all streams with input pending */
+ struct h2_iqueue *in_process; /* all streams ready for processing on slave */
+
} h2_session;
const char *h2_session_state_str(h2_session_state state);
@@ -155,6 +159,9 @@ apr_status_t h2_session_rcreate(h2_sessi
request_rec *r, struct h2_ctx *ctx,
struct h2_workers *workers);
+void h2_session_event(h2_session *session, h2_session_event_t ev,
+ int err, const char *msg);
+
/**
* Process the given HTTP/2 session until it is ended or a fatal
* error occurred.
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_stream.c Fri Mar 31 19:41:01 2017
@@ -43,18 +43,6 @@
#include "h2_util.h"
-#define S_XXX (-2)
-#define S_ERR (-1)
-#define S_NOP (0)
-#define S_IDL (H2_SS_IDL + 1)
-#define S_RS_L (H2_SS_RSVD_L + 1)
-#define S_RS_R (H2_SS_RSVD_R + 1)
-#define S_OPEN (H2_SS_OPEN + 1)
-#define S_CL_L (H2_SS_CLOSED_L + 1)
-#define S_CL_R (H2_SS_CLOSED_R + 1)
-#define S_CLS (H2_SS_CLOSED + 1)
-#define S_CLN (H2_SS_CLEANUP + 1)
-
static const char *h2_ss_str(h2_stream_state_t state)
{
switch (state) {
@@ -84,37 +72,54 @@ const char *h2_stream_state_str(h2_strea
return h2_ss_str(stream->state);
}
+/* Abbreviations for stream transit tables */
+#define S_XXX (-2) /* Programming Error */
+#define S_ERR (-1) /* Protocol Error */
+#define S_NOP (0) /* No Change */
+#define S_IDL (H2_SS_IDL + 1)
+#define S_RS_L (H2_SS_RSVD_L + 1)
+#define S_RS_R (H2_SS_RSVD_R + 1)
+#define S_OPEN (H2_SS_OPEN + 1)
+#define S_CL_L (H2_SS_CLOSED_L + 1)
+#define S_CL_R (H2_SS_CLOSED_R + 1)
+#define S_CLS (H2_SS_CLOSED + 1)
+#define S_CLN (H2_SS_CLEANUP + 1)
+
+/* state transisitions when certain frame types are sent */
static int trans_on_send[][H2_SS_MAX] = {
-/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
-/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
-/* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
-/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
-/* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
-/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
-/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
+{ S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* DATA */
+{ S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },/* HEADERS */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
+{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
+{ S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
};
+/* state transisitions when certain frame types are received */
static int trans_on_recv[][H2_SS_MAX] = {
-/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
-/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
-/* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
-/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
-/* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
-/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
-/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
-/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
+{ S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* DATA */
+{ S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },/* HEADERS */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* PRIORITY */
+{ S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* RST_STREAM */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* SETTINGS */
+{ S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PUSH_PROMISE */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* PING */
+{ S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },/* GOAWAY */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* WINDOW_UPDATE */
+{ S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },/* CONT */
};
+/* state transisitions when certain events happen */
static int trans_on_event[][H2_SS_MAX] = {
-/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },
-/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },
-/* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
-/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },
+/*S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
+{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
+{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
+{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
+{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
};
static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
@@ -137,7 +142,7 @@ static int on_frame(h2_stream_state_t st
ap_assert(frame_type >= 0);
ap_assert(state >= 0);
if (frame_type >= maxlen) {
- return state; /* NOP */
+ return state; /* NOP, ignore unknown frame types */
}
return on_map(state, frame_map[frame_type]);
}
@@ -152,9 +157,15 @@ static int on_frame_recv(h2_stream_state
return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
}
-static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
+static int on_event(h2_stream* stream, h2_stream_event_t ev)
{
- return on_map(state, trans_on_event[ev]);
+ if (stream->monitor && stream->monitor->on_event) {
+ stream->monitor->on_event(stream->monitor->ctx, stream, ev);
+ }
+ if (ev < H2_ALEN(trans_on_event)) {
+ return on_map(stream->state, trans_on_event[ev]);
+ }
+ return stream->state;
}
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
@@ -171,11 +182,16 @@ static void H2_STREAM_OUT_LOG(int lvl, h
}
static apr_status_t setup_input(h2_stream *stream) {
- if (stream->input == NULL && !stream->input_eof) {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", H2_BEAM_OWNER_SEND, 0,
- stream->session->s->timeout);
- h2_beam_send_from(stream->input, stream->pool);
+ if (stream->input == NULL) {
+ int empty = (stream->input_eof
+ && (!stream->in_buffer
+ || APR_BRIGADE_EMPTY(stream->in_buffer)));
+ if (!empty) {
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", H2_BEAM_OWNER_SEND, 0,
+ stream->session->s->timeout);
+ h2_beam_send_from(stream->input, stream->pool);
+ }
}
return APR_SUCCESS;
}
@@ -197,27 +213,27 @@ static apr_status_t close_input(h2_strea
}
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
- apr_bucket_brigade *tmp;
apr_bucket *b;
h2_headers *r;
- tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ if (!stream->in_buffer) {
+ stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
+ }
r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
stream->trailers = NULL;
b = h2_bucket_headers_create(c->bucket_alloc, r);
- APR_BRIGADE_INSERT_TAIL(tmp, b);
+ APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
b = apr_bucket_eos_create(c->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(tmp, b);
+ APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STRM_MSG(stream, "added trailers"));
- setup_input(stream);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
+ h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
if (stream->input) {
+ h2_stream_flush_input(stream);
return h2_beam_close(stream->input);
}
return status;
@@ -225,7 +241,7 @@ static apr_status_t close_input(h2_strea
static apr_status_t close_output(h2_stream *stream)
{
- if (h2_beam_is_closed(stream->output)) {
+ if (!stream->output || h2_beam_is_closed(stream->output)) {
return APR_SUCCESS;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
@@ -324,7 +340,7 @@ void h2_stream_dispatch(h2_stream *strea
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STRM_MSG(stream, "dispatch event %d"), ev);
- new_state = on_event(stream->state, ev);
+ new_state = on_event(stream, ev);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
@@ -335,7 +351,7 @@ void h2_stream_dispatch(h2_stream *strea
else if (new_state == stream->state) {
/* nop */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- H2_STRM_MSG(stream, "ignored event %d"), ev);
+ H2_STRM_MSG(stream, "non-state event %d"), ev);
return;
}
else {
@@ -394,7 +410,7 @@ apr_status_t h2_stream_send_frame(h2_str
H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
status = transit(stream, new_state);
if (status == APR_SUCCESS && eos) {
- status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
+ status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
}
return status;
}
@@ -444,7 +460,23 @@ apr_status_t h2_stream_recv_frame(h2_str
}
status = transit(stream, new_state);
if (status == APR_SUCCESS && eos) {
- status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
+ status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
+ }
+ return status;
+}
+
+apr_status_t h2_stream_flush_input(h2_stream *stream)
+{
+ apr_status_t status = APR_SUCCESS;
+
+ if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
+ setup_input(stream);
+ status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
+ stream->in_last_write = apr_time_now();
+ }
+ if (stream->input_eof
+ && stream->input && !h2_beam_is_closed(stream->input)) {
+ status = h2_beam_close(stream->input);
}
return status;
}
@@ -454,21 +486,27 @@ apr_status_t h2_stream_recv_DATA(h2_stre
{
h2_session *session = stream->session;
apr_status_t status = APR_SUCCESS;
- apr_bucket_brigade *tmp;
- ap_assert(stream);
+ stream->in_data_frames++;
if (len > 0) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
- H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
-
- tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
- apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
- setup_input(stream);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
+ if (APLOGctrace3(session->c)) {
+ const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
+ H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"),
+ (int)len, load);
+ }
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
+ H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
+ }
+ stream->in_data_octets += len;
+ if (!stream->in_buffer) {
+ stream->in_buffer = apr_brigade_create(stream->pool,
+ session->c->bucket_alloc);
+ }
+ apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
+ h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
- stream->in_data_frames++;
- stream->in_data_octets += len;
return status;
}
@@ -493,9 +531,12 @@ h2_stream *h2_stream_create(int id, apr_
stream->monitor = monitor;
stream->max_mem = session->max_stream_mem;
- h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
- session->s->timeout);
-
+#ifdef H2_NG2_LOCAL_WIN_SIZE
+ stream->in_window_size =
+ nghttp2_session_get_stream_local_window_size(
+ stream->session->ngh2, stream->id);
+#endif
+
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
H2_STRM_LOG(APLOGNO(03082), stream, "created"));
on_state_enter(stream);
@@ -563,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, in
if (stream->input) {
h2_beam_abort(stream->input);
}
- h2_beam_leave(stream->output);
+ if (stream->output) {
+ h2_beam_leave(stream->output);
+ }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STRM_MSG(stream, "reset, error=%d"), error_code);
h2_stream_dispatch(stream, H2_SEV_CANCELLED);
@@ -733,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_st
*presponse = NULL;
}
+ ap_assert(stream);
+
if (stream->rst_error) {
*plen = 0;
*peos = 1;
@@ -741,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_st
c = stream->session->c;
prep_output(stream);
-
+
/* determine how much we'd like to send. We cannot send more than
* is requested. But we can reduce the size in case the master
* connection operates in smaller chunks. (TSL warmup) */
@@ -753,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_st
h2_util_bb_avail(stream->out_buffer, plen, peos);
if (!*peos && *plen < requested && *plen < stream->max_mem) {
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
- status = h2_beam_receive(stream->output, stream->out_buffer,
- APR_NONBLOCK_READ, stream->max_mem - *plen);
+ if (stream->output) {
+ status = h2_beam_receive(stream->output, stream->out_buffer,
+ APR_NONBLOCK_READ,
+ stream->max_mem - *plen);
+ }
+ else {
+ status = APR_EOF;
+ }
+
if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
@@ -925,4 +977,64 @@ int h2_stream_was_closed(const h2_stream
}
}
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
+{
+ h2_session *session = stream->session;
+
+ if (amount > 0) {
+ apr_off_t consumed = amount;
+
+ while (consumed > 0) {
+ int len = (consumed > INT_MAX)? INT_MAX : consumed;
+ nghttp2_session_consume(session->ngh2, stream->id, len);
+ consumed -= len;
+ }
+
+#ifdef H2_NG2_LOCAL_WIN_SIZE
+ if (1) {
+ int cur_size = nghttp2_session_get_stream_local_window_size(
+ session->ngh2, stream->id);
+ int win = stream->in_window_size;
+ int thigh = win * 8/10;
+ int tlow = win * 2/10;
+ const int win_max = 2*1024*1024;
+ const int win_min = 32*1024;
+
+ /* Work in progress, probably should add directives for these
+ * values once this stabilizes somewhat. The general idea is
+ * to adapt stream window sizes if the input window changes
+ * a) very quickly (< good RTT) from full to empty
+ * b) only a little bit (> bad RTT)
+ * where in a) it grows and in b) it shrinks again.
+ */
+ if (cur_size > thigh && amount > thigh && win < win_max) {
+ /* almost empty again with one reported consumption, how
+ * long did this take? */
+ long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+ if (ms < 40) {
+ win = H2MIN(win_max, win + (64*1024));
+ }
+ }
+ else if (cur_size < tlow && amount < tlow && win > win_min) {
+ /* staying full, for how long already? */
+ long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+ if (ms > 700) {
+ win = H2MAX(win_min, win - (32*1024));
+ }
+ }
+
+ if (win != stream->in_window_size) {
+ stream->in_window_size = win;
+ nghttp2_session_set_local_window_size(session->ngh2,
+ NGHTTP2_FLAG_NONE, stream->id, win);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
+ session->id, stream->id, (long)amount,
+ cur_size, stream->in_window_size);
+ }
+#endif
+ }
+ return APR_SUCCESS;
+}
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_stream.h Fri Mar 31 19:41:01 2017
@@ -25,10 +25,12 @@
* connection to the client. The h2_session writes to the h2_stream,
* adding HEADERS and DATA and finally an EOS. When headers are done,
* h2_stream is scheduled for handling, which is expected to produce
- * a h2_headers.
+ * a response h2_headers at least.
*
- * The h2_headers gives the HEADER frames to sent to the client, followed
- * by DATA frames read from the h2_stream until EOS is reached.
+ * The h2_headers may be followed by more h2_headers (interim responses) and
+ * by DATA frames read from the h2_stream until EOS is reached. Trailers
+ * are send when a last h2_headers is received. This always closes the stream
+ * output.
*/
struct h2_mplx;
@@ -45,6 +47,9 @@ typedef void h2_stream_state_cb(void *ct
typedef void h2_stream_event_cb(void *ctx, h2_stream *stream,
h2_stream_event_t ev);
+/**
+ * Callback structure for events and stream state transisitions
+ */
typedef struct h2_stream_monitor {
void *ctx;
h2_stream_state_cb *on_state_enter; /* called when a state is entered */
@@ -52,6 +57,8 @@ typedef struct h2_stream_monitor {
was detected */
h2_stream_event_cb *on_state_event; /* called right before the given event
result in a new stream state */
+ h2_stream_event_cb *on_event; /* called for events that do not
+ trigger a state change */
} h2_stream_monitor;
struct h2_stream {
@@ -69,9 +76,13 @@ struct h2_stream {
int request_headers_added; /* number of request headers added */
struct h2_bucket_beam *input;
+ apr_bucket_brigade *in_buffer;
+ int in_window_size;
+ apr_time_t in_last_write;
+
struct h2_bucket_beam *output;
- apr_size_t max_mem; /* maximum amount of data buffered */
apr_bucket_brigade *out_buffer;
+ apr_size_t max_mem; /* maximum amount of data buffered */
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
@@ -99,6 +110,10 @@ struct h2_stream {
* @param id the stream identifier
* @param pool the memory pool to use for this stream
* @param session the session this stream belongs to
+ * @param monitor an optional monitor to be called for events and
+ * state transisitions
+ * @param initiated_on the id of the stream this one was initiated on (PUSH)
+ *
* @return the newly opened stream
*/
h2_stream *h2_stream_create(int id, apr_pool_t *pool,
@@ -111,6 +126,13 @@ h2_stream *h2_stream_create(int id, apr_
*/
void h2_stream_destroy(h2_stream *stream);
+/**
+ * Prepare the stream so that processing may start.
+ *
+ * This is the time to allocated resources not needed before.
+ *
+ * @param stream the stream to prep
+ */
apr_status_t h2_stream_prep_processing(h2_stream *stream);
/*
@@ -143,6 +165,12 @@ void h2_stream_cleanup(h2_stream *stream
apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
/**
+ * Notify the stream that amount bytes have been consumed of its input
+ * since the last invocation of this method (delta amount).
+ */
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount);
+
+/**
* Set complete stream headers from given h2_request.
*
* @param stream stream to write request to
@@ -189,6 +217,8 @@ apr_status_t h2_stream_recv_frame(h2_str
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
const uint8_t *data, size_t len);
+apr_status_t h2_stream_flush_input(h2_stream *stream);
+
/**
* Reset the stream. Stream write/reads will return errors afterwards.
*
@@ -275,7 +305,6 @@ const char *h2_stream_state_str(h2_strea
*/
int h2_stream_is_ready(h2_stream *stream);
-
#define H2_STRM_MSG(s, msg) \
"h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_task.c?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_task.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_task.c Fri Mar 31 19:41:01 2017
@@ -45,7 +45,6 @@
#include "h2_session.h"
#include "h2_stream.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_util.h"
static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb,
@@ -217,14 +216,18 @@ static apr_status_t h2_filter_slave_in(a
apr_status_t status = APR_SUCCESS;
apr_bucket *b, *next;
apr_off_t bblen;
- apr_size_t rmax;
+ const int trace1 = APLOGctrace1(f->c);
+ apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)?
+ (apr_size_t)readbytes : APR_SIZE_MAX);
task = h2_ctx_cget_task(f->c);
ap_assert(task);
- rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld",
- task->id, mode, block, (long)readbytes);
+
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld",
+ task->id, mode, block, (long)readbytes);
+ }
if (mode == AP_MODE_INIT) {
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
@@ -250,19 +253,23 @@ static apr_status_t h2_filter_slave_in(a
while (APR_BRIGADE_EMPTY(task->input.bb)) {
/* Get more input data for our request. */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_slave_in(%s): get more data from mplx, block=%d, "
- "readbytes=%ld", task->id, block, (long)readbytes);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_slave_in(%s): get more data from mplx, block=%d, "
+ "readbytes=%ld", task->id, block, (long)readbytes);
+ }
if (task->input.beam) {
status = h2_beam_receive(task->input.beam, task->input.bb, block,
- H2MIN(readbytes, 32*1024));
+ 128*1024);
}
else {
status = APR_EOF;
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
- "h2_slave_in(%s): read returned", task->id);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
+ "h2_slave_in(%s): read returned", task->id);
+ }
if (APR_STATUS_IS_EAGAIN(status)
&& (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
/* chunked input handling does not seem to like it if we
@@ -276,9 +283,11 @@ static apr_status_t h2_filter_slave_in(a
else if (status != APR_SUCCESS) {
return status;
}
-
- h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
- "input.beam recv raw", task->input.bb);
+
+ if (trace1) {
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "input.beam recv raw", task->input.bb);
+ }
if (h2_task_logio_add_bytes_in) {
apr_brigade_length(bb, 0, &bblen);
h2_task_logio_add_bytes_in(f->c, bblen);
@@ -292,12 +301,16 @@ static apr_status_t h2_filter_slave_in(a
return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF;
}
- h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
- "task_input.bb", task->input.bb);
+ if (trace1) {
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "task_input.bb", task->input.bb);
+ }
if (APR_BRIGADE_EMPTY(task->input.bb)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
- "h2_slave_in(%s): no data", task->id);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
+ "h2_slave_in(%s): no data", task->id);
+ }
return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
}
@@ -322,9 +335,11 @@ static apr_status_t h2_filter_slave_in(a
apr_size_t len = sizeof(buffer)-1;
apr_brigade_flatten(bb, buffer, &len);
buffer[len] = 0;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
- "h2_slave_in(%s): getline: %s",
- task->id, buffer);
+ if (trace1) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
+ "h2_slave_in(%s): getline: %s",
+ task->id, buffer);
+ }
}
}
else {
@@ -337,7 +352,7 @@ static apr_status_t h2_filter_slave_in(a
status = APR_ENOTIMPL;
}
- if (APLOGctrace1(f->c)) {
+ if (trace1) {
apr_brigade_length(bb, 0, &bblen);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_slave_in(%s): %ld data bytes", task->id, (long)bblen);
@@ -481,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c,
return OK;
}
-h2_task *h2_task_create(h2_stream *stream, conn_rec *slave)
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+ const h2_request *req, h2_mplx *m,
+ h2_bucket_beam *input,
+ apr_interval_time_t timeout,
+ apr_size_t output_max_mem)
{
apr_pool_t *pool;
h2_task *task;
ap_assert(slave);
- ap_assert(stream);
- ap_assert(stream->request);
+ ap_assert(req);
apr_pool_create(&pool, slave->pool);
task = apr_pcalloc(pool, sizeof(h2_task));
if (task == NULL) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
- H2_STRM_LOG(APLOGNO(02941), stream, "create task"));
return NULL;
}
- task->id = apr_psprintf(pool, "%ld-%d",
- stream->session->id, stream->id);
- task->stream_id = stream->id;
+ task->id = "000";
+ task->stream_id = stream_id;
task->c = slave;
- task->mplx = stream->session->mplx;
- task->c->keepalives = slave->master->keepalives;
+ task->mplx = m;
task->pool = pool;
- task->request = stream->request;
- task->input.beam = stream->input;
- task->output.beam = stream->output;
- task->timeout = stream->session->s->timeout;
-
- h2_beam_send_from(stream->output, task->pool);
- h2_ctx_create_for(slave, task);
-
+ task->request = req;
+ task->timeout = timeout;
+ task->input.beam = input;
+ task->output.max_buffer = output_max_mem;
+
return task;
}
void h2_task_destroy(h2_task *task)
{
+ if (task->output.beam) {
+ h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy");
+ h2_beam_destroy(task->output.beam);
+ task->output.beam = NULL;
+ }
+
if (task->eor) {
apr_bucket_destroy(task->eor);
}
@@ -527,9 +544,14 @@ void h2_task_destroy(h2_task *task)
apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
{
+ conn_rec *c;
+
ap_assert(task);
-
- if (task->c->master) {
+ c = task->c;
+ task->worker_started = 1;
+ task->started_at = apr_time_now();
+
+ if (c->master) {
/* Each conn_rec->id is supposed to be unique at a point in time. Since
* some modules (and maybe external code) uses this id as an identifier
* for the request_rec they handle, it needs to be unique for slave
@@ -547,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, a
*/
int slave_id, free_bits;
+ task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id,
+ task->stream_id);
if (sizeof(unsigned long) >= 8) {
free_bits = 32;
slave_id = task->stream_id;
@@ -558,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, a
free_bits = 8;
slave_id = worker_id;
}
- task->c->id = (task->c->master->id << free_bits)^slave_id;
+ task->c->id = (c->master->id << free_bits)^slave_id;
+ c->keepalive = AP_CONN_KEEPALIVE;
+ }
+
+ h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output",
+ H2_BEAM_OWNER_SEND, 0, task->timeout);
+ if (!task->output.beam) {
+ return APR_ENOMEM;
+ }
+
+ h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer);
+ h2_beam_send_from(task->output.beam, task->pool);
+
+ h2_ctx_create_for(c, task);
+ apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
+
+ if (task->input.beam) {
+ h2_beam_mutex_enable(task->input.beam);
}
- task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
+ h2_slave_run_pre_connection(c, ap_get_conn_socket(c));
+
+ task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc);
if (task->request->serialize) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): serialize request %s %s",
task->id, task->request->method, task->request->path);
apr_brigade_printf(task->input.bb, NULL,
@@ -573,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, a
apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process connection", task->id);
+
task->c->current_thread = thread;
- ap_run_process_connection(task->c);
+ ap_run_process_connection(c);
if (task->frozen) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process_conn returned frozen task",
task->id);
/* cleanup delayed */
return APR_EAGAIN;
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): processing done", task->id);
return output_finish(task);
}
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_task.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_task.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_task.h Fri Mar 31 19:41:01 2017
@@ -73,6 +73,7 @@ struct h2_task {
unsigned int copy_files : 1;
struct h2_response_parser *rparser;
apr_bucket_brigade *bb;
+ apr_size_t max_buffer;
} output;
struct h2_mplx *mplx;
@@ -91,7 +92,11 @@ struct h2_task {
struct h2_req_engine *assigned; /* engine that task has been assigned to */
};
-h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave);
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+ const h2_request *req, struct h2_mplx *m,
+ struct h2_bucket_beam *input,
+ apr_interval_time_t timeout,
+ apr_size_t output_max_mem);
void h2_task_destroy(h2_task *task);
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_util.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_util.c?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_util.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_util.c Fri Mar 31 19:41:01 2017
@@ -15,6 +15,8 @@
#include <assert.h>
#include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_core.h>
@@ -604,6 +606,294 @@ int h2_iq_contains(h2_iqueue *q, int sid
}
/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_fifo {
+ void **elems;
+ int nelems;
+ int head;
+ int count;
+ int aborted;
+ apr_thread_mutex_t *lock;
+ apr_thread_cond_t *not_empty;
+ apr_thread_cond_t *not_full;
+};
+
+static int nth_index(h2_fifo *fifo, int n)
+{
+ return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data)
+{
+ h2_fifo *fifo = data;
+
+ apr_thread_cond_destroy(fifo->not_empty);
+ apr_thread_cond_destroy(fifo->not_full);
+ apr_thread_mutex_destroy(fifo->lock);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ apr_status_t rv;
+ h2_fifo *fifo;
+
+ fifo = apr_pcalloc(pool, sizeof(*fifo));
+ if (fifo == NULL) {
+ return APR_ENOMEM;
+ }
+
+ rv = apr_thread_mutex_create(&fifo->lock,
+ APR_THREAD_MUTEX_UNNESTED, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_empty, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_full, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+ if (fifo->elems == NULL) {
+ return APR_ENOMEM;
+ }
+ fifo->nelems = capacity;
+
+ *pfifo = fifo;
+ apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_fifo_term(h2_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ fifo->aborted = 1;
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_interrupt(h2_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ apr_thread_cond_broadcast(fifo->not_full);
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+int h2_fifo_count(h2_fifo *fifo)
+{
+ return fifo->count;
+}
+
+static apr_status_t check_not_empty(h2_fifo *fifo, int block)
+{
+ if (fifo->count == 0) {
+ if (!block) {
+ return APR_EAGAIN;
+ }
+ while (fifo->count == 0) {
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if (fifo->count == fifo->nelems) {
+ if (block) {
+ while (fifo->count == fifo->nelems) {
+ if (fifo->aborted) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_full, fifo->lock);
+ }
+ }
+ else {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EAGAIN;
+ }
+ }
+
+ ap_assert(fifo->count < fifo->nelems);
+ fifo->elems[nth_index(fifo, fifo->count)] = elem;
+ ++fifo->count;
+ if (fifo->count == 1) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 0);
+}
+
+static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ *pelem = NULL;
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ *pelem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 0);
+}
+
+static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int block)
+{
+ apr_status_t rv;
+ void *elem;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+
+ switch (fn(elem, ctx)) {
+ case H2_FIFO_OP_PULL:
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ break;
+ case H2_FIFO_OP_REPUSH:
+ if (fifo->count > 1) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count < fifo->nelems) {
+ fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
+ }
+ }
+ break;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx)
+{
+ return fifo_peek(fifo, fn, ctx, 1);
+}
+
+apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx)
+{
+ return fifo_peek(fifo, fn, ctx, 0);
+}
+
+apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ int i, rc;
+ void *e;
+
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ e = fifo->elems[nth_index(fifo, i)];
+ if (e == elem) {
+ ++rc;
+ }
+ else if (rc) {
+ fifo->elems[nth_index(fifo, i-rc)] = e;
+ }
+ }
+ if (rc) {
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ rv = APR_SUCCESS;
+ }
+ else {
+ rv = APR_EAGAIN;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+
+/*******************************************************************************
* h2_util for apt_table_t
******************************************************************************/
@@ -701,17 +991,16 @@ apr_status_t h2_brigade_concat_length(ap
apr_bucket_brigade *src,
apr_off_t length)
{
- apr_bucket *b, *next;
+ apr_bucket *b;
apr_off_t remain = length;
apr_status_t status = APR_SUCCESS;
- for (b = APR_BRIGADE_FIRST(src);
- b != APR_BRIGADE_SENTINEL(src);
- b = next) {
- next = APR_BUCKET_NEXT(b);
+ while (!APR_BRIGADE_EMPTY(src)) {
+ b = APR_BRIGADE_FIRST(src);
if (APR_BUCKET_IS_METADATA(b)) {
- /* fall through */
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
}
else {
if (remain == b->length) {
@@ -734,10 +1023,10 @@ apr_status_t h2_brigade_concat_length(ap
apr_bucket_split(b, remain);
}
}
+ APR_BUCKET_REMOVE(b);
+ APR_BRIGADE_INSERT_TAIL(dest, b);
+ remain -= b->length;
}
- APR_BUCKET_REMOVE(b);
- APR_BRIGADE_INSERT_TAIL(dest, b);
- remain -= b->length;
}
return status;
}
@@ -925,55 +1214,14 @@ apr_size_t h2_util_bucket_print(char *bu
if (bmax <= off) {
return off;
}
- if (APR_BUCKET_IS_METADATA(b)) {
- if (APR_BUCKET_IS_EOS(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "eos");
- }
- else if (APR_BUCKET_IS_FLUSH(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "flush");
- }
- else if (AP_BUCKET_IS_EOR(b)) {
- off += apr_snprintf(buffer+off, bmax-off, "eor");
- }
- else {
- off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name);
- }
+ else if (APR_BUCKET_IS_METADATA(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name);
}
- else {
- const char *btype = b->type->name;
- if (APR_BUCKET_IS_FILE(b)) {
- btype = "file";
- }
- else if (APR_BUCKET_IS_PIPE(b)) {
- btype = "pipe";
- }
- else if (APR_BUCKET_IS_SOCKET(b)) {
- btype = "socket";
- }
- else if (APR_BUCKET_IS_HEAP(b)) {
- btype = "heap";
- }
- else if (APR_BUCKET_IS_TRANSIENT(b)) {
- btype = "transient";
- }
- else if (APR_BUCKET_IS_IMMORTAL(b)) {
- btype = "immortal";
- }
-#if APR_HAS_MMAP
- else if (APR_BUCKET_IS_MMAP(b)) {
- btype = "mmap";
- }
-#endif
- else if (APR_BUCKET_IS_POOL(b)) {
- btype = "pool";
- }
-
- if (bmax > off) {
- off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
- btype,
- (long)(b->length == ((apr_size_t)-1)?
- -1 : b->length));
- }
+ else if (bmax > off) {
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
+ b->type->name,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
}
return off;
}
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_util.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_util.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_util.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_util.h Fri Mar 31 19:41:01 2017
@@ -184,6 +184,57 @@ size_t h2_iq_mshift(h2_iqueue *q, int *p
int h2_iq_contains(h2_iqueue *q, int sid);
/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_fifo h2_fifo;
+
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+apr_status_t h2_fifo_term(h2_fifo *fifo);
+apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
+
+int h2_fifo_count(h2_fifo *fifo);
+
+apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
+apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
+
+apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem);
+apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem);
+
+typedef enum {
+ H2_FIFO_OP_PULL, /* pull the element from the queue, ie discard it */
+ H2_FIFO_OP_REPUSH, /* pull and immediatley re-push it */
+} h2_fifo_op_t;
+
+typedef h2_fifo_op_t h2_fifo_peek_fn(void *head, void *ctx);
+
+/**
+ * Call given function on the head of the queue, once it exists, and
+ * perform the returned operation on it. The queue will hold its lock during
+ * this time, so no other operations on the queue are possible.
+ * @param fifo the queue to peek at
+ * @param fn the function to call on the head, once available
+ * @param ctx context to pass in call to function
+ */
+apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx);
+
+/**
+ * Non-blocking version of h2_fifo_peek.
+ */
+apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem);
+
+/*******************************************************************************
* common helpers
******************************************************************************/
/* h2_log2(n) iff n is a power of 2 */
@@ -379,8 +430,8 @@ do { \
const char *line = "(null)"; \
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
- ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \
- (c)->log_id, (len? buffer : line)); \
+ ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld): %s", \
+ ((c)->master? (c)->master->id : (c)->id), (len? buffer : line)); \
} while(0)
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_version.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_version.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_version.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_version.h Fri Mar 31 19:41:01 2017
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.9.3"
+#define MOD_HTTP2_VERSION "1.10.0"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010903
+#define MOD_HTTP2_VERSION_NUM 0x010a00
#endif /* mod_h2_h2_version_h */
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_workers.c Fri Mar 31 19:41:01 2017
@@ -27,221 +27,248 @@
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_workers.h"
+#include "h2_util.h"
+typedef struct h2_slot h2_slot;
+struct h2_slot {
+ int id;
+ h2_slot *next;
+ h2_workers *workers;
+ int aborted;
+ int sticks;
+ h2_task *task;
+ apr_thread_t *thread;
+ apr_thread_cond_t *not_idle;
+};
-static int in_list(h2_workers *workers, h2_mplx *m)
+static h2_slot *pop_slot(h2_slot **phead)
{
- h2_mplx *e;
- for (e = H2_MPLX_LIST_FIRST(&workers->mplxs);
- e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
- e = H2_MPLX_NEXT(e)) {
- if (e == m) {
- return 1;
+ /* Atomically pop a slot from the list */
+ for (;;) {
+ h2_slot *first = *phead;
+ if (first == NULL) {
+ return NULL;
+ }
+ if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
+ first->next = NULL;
+ return first;
}
}
- return 0;
}
-static void cleanup_zombies(h2_workers *workers, int lock)
+static void push_slot(h2_slot **phead, h2_slot *slot)
{
- if (lock) {
- apr_thread_mutex_lock(workers->lock);
+ /* Atomically push a slot to the list */
+ ap_assert(!slot->next);
+ for (;;) {
+ h2_slot *next = slot->next = *phead;
+ if (apr_atomic_casptr((void*)phead, slot, next) == next) {
+ return;
+ }
+ }
+}
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
+
+static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
+{
+ apr_status_t status;
+
+ slot->workers = workers;
+ slot->aborted = 0;
+ slot->task = NULL;
+ if (!slot->not_idle) {
+ status = apr_thread_cond_create(&slot->not_idle, workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
}
- while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
- h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
- H2_WORKER_REMOVE(zombie);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: cleanup zombie %d", zombie->id);
- h2_worker_destroy(zombie);
+
+ /* thread will either immediately start work or add itself
+ * to the idle queue */
+ apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
+ workers->pool);
+ if (!slot->thread) {
+ push_slot(&workers->free, slot);
+ return APR_ENOMEM;
}
- if (lock) {
+
+ ++workers->worker_count;
+ return APR_SUCCESS;
+}
+
+static apr_status_t add_worker(h2_workers *workers)
+{
+ h2_slot *slot = pop_slot(&workers->free);
+ if (slot) {
+ return activate_slot(workers, slot);
+ }
+ return APR_EAGAIN;
+}
+
+static void wake_idle_worker(h2_workers *workers)
+{
+ h2_slot *slot = pop_slot(&workers->idle);
+ if (slot) {
+ apr_thread_mutex_lock(workers->lock);
+ apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(workers->lock);
}
+ else if (workers->dynamic) {
+ add_worker(workers);
+ }
}
-static h2_task *next_task(h2_workers *workers)
+static void cleanup_zombies(h2_workers *workers)
{
- h2_task *task = NULL;
- h2_mplx *last = NULL;
- int has_more;
-
- /* Get the next h2_mplx to process that has a task to hand out.
- * If it does, place it at the end of the queu and return the
- * task to the worker.
- * If it (currently) has no tasks, remove it so that it needs
- * to register again for scheduling.
- * If we run out of h2_mplx in the queue, we need to wait for
- * new mplx to arrive. Depending on how many workers do exist,
- * we do a timed wait or block indefinitely.
- */
- while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
- h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
-
- if (last == m) {
- break;
- }
- H2_MPLX_REMOVE(m);
- --workers->mplx_count;
-
- task = h2_mplx_pop_task(m, &has_more);
- if (has_more) {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- if (!last) {
- last = m;
- }
+ h2_slot *slot;
+ while ((slot = pop_slot(&workers->zombies))) {
+ if (slot->thread) {
+ apr_status_t status;
+ apr_thread_join(&status, slot->thread);
+ slot->thread = NULL;
}
+ --workers->worker_count;
+ push_slot(&workers->free, slot);
}
- return task;
+}
+
+static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
+{
+ int has_more;
+ slot->task = h2_mplx_pop_task(m, &has_more);
+ if (slot->task) {
+ /* Ok, we got something to give back to the worker for execution.
+ * If we still have idle workers, we let the worker be sticky,
+ * e.g. making it poll the task's h2_mplx instance for more work
+ * before asking back here. */
+ slot->sticks = slot->workers->max_workers;
+ return has_more? APR_EAGAIN : APR_SUCCESS;
+ }
+ slot->sticks = 0;
+ return APR_EOF;
+}
+
+static h2_fifo_op_t mplx_peek(void *head, void *ctx)
+{
+ h2_mplx *m = head;
+ h2_slot *slot = ctx;
+
+ if (slot_pull_task(slot, m) == APR_EAGAIN) {
+ wake_idle_worker(slot->workers);
+ return H2_FIFO_OP_REPUSH;
+ }
+ return H2_FIFO_OP_PULL;
}
/**
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
-static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
- h2_task **ptask, int *psticky)
+static apr_status_t get_next(h2_slot *slot)
{
+ h2_workers *workers = slot->workers;
apr_status_t status;
- apr_time_t wait_until = 0, now;
- h2_workers *workers = ctx;
- h2_task *task = NULL;
-
- *ptask = NULL;
- *psticky = 0;
- status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ++workers->idle_workers;
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): looking for work", worker->id);
-
- while (!h2_worker_is_aborted(worker) && !workers->aborted
- && !(task = next_task(workers))) {
-
- /* Need to wait for a new tasks to arrive. If we are above
- * minimum workers, we do a timed wait. When timeout occurs
- * and we have still more workers, we shut down one after
- * the other. */
- cleanup_zombies(workers, 0);
- if (workers->worker_count > workers->min_workers) {
- now = apr_time_now();
- if (now >= wait_until) {
- wait_until = now + apr_time_from_sec(workers->max_idle_secs);
- }
-
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal, "
- "workers=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- status = apr_thread_cond_timedwait(workers->mplx_added,
- workers->lock,
- wait_until - now);
- if (status == APR_TIMEUP
- && workers->worker_count > workers->min_workers) {
- /* waited long enough without getting a task and
- * we are above min workers, abort this one. */
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
- workers->s,
- "h2_workers: aborting idle worker");
- h2_worker_abort(worker);
- break;
- }
- }
- else {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal (eternal), "
- "worker_count=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- apr_thread_cond_wait(workers->mplx_added, workers->lock);
+ slot->task = NULL;
+ while (!slot->aborted) {
+ if (!slot->task) {
+ status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
+ if (status == APR_EOF) {
+ return status;
}
}
- /* Here, we either have gotten task or decided to shut down
- * the calling worker.
- */
- if (task) {
- /* Ok, we got something to give back to the worker for execution.
- * If we have more idle workers than h2_mplx in our queue, then
- * we let the worker be sticky, e.g. making it poll the task's
- * h2_mplx instance for more work before asking back here.
- * This avoids entering our global lock as long as enough idle
- * workers remain. Stickiness of a worker ends when the connection
- * has no new tasks to process, so the worker will get back here
- * eventually.
- */
- *ptask = task;
- *psticky = (workers->max_workers >= workers->mplx_count);
-
- if (workers->mplx_count && workers->idle_workers > 1) {
- apr_thread_cond_signal(workers->mplx_added);
- }
+ if (slot->task) {
+ return APR_SUCCESS;
}
+ apr_thread_mutex_lock(workers->lock);
+ cleanup_zombies(workers);
+
+ ++workers->idle_workers;
+ push_slot(&workers->idle, slot);
+ apr_thread_cond_wait(slot->not_idle, workers->lock);
--workers->idle_workers;
+
apr_thread_mutex_unlock(workers->lock);
}
-
- return *ptask? APR_SUCCESS : APR_EOF;
+ return APR_EOF;
}
-static void worker_done(h2_worker *worker, void *ctx)
+static void slot_done(h2_slot *slot)
{
- h2_workers *workers = ctx;
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): done", worker->id);
- H2_WORKER_REMOVE(worker);
- --workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
-
- apr_thread_mutex_unlock(workers->lock);
- }
+ push_slot(&(slot->workers->zombies), slot);
}
-static apr_status_t add_worker(h2_workers *workers)
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
- h2_worker *w = h2_worker_create(workers->next_worker_id++,
- workers->pool, workers->thread_attr,
- get_mplx_next, worker_done, workers);
- if (!w) {
- return APR_ENOMEM;
+ h2_slot *slot = wctx;
+
+ while (!slot->aborted) {
+
+ /* Get a h2_task from the mplxs queue. */
+ get_next(slot);
+ while (slot->task) {
+
+ h2_task_do(slot->task, thread, slot->id);
+
+ /* Report the task as done. If stickyness is left, offer the
+ * mplx the opportunity to give us back a new task right away.
+ */
+ if (!slot->aborted && (--slot->sticks > 0)) {
+ h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
+ }
+ else {
+ h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
+ slot->task = NULL;
+ }
+ }
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: adding worker(%d)", w->id);
- ++workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
- return APR_SUCCESS;
+
+ slot_done(slot);
+ return NULL;
}
-static apr_status_t h2_workers_start(h2_workers *workers)
+static apr_status_t workers_pool_cleanup(void *data)
{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: starting");
-
- while (workers->worker_count < workers->min_workers
- && status == APR_SUCCESS) {
- status = add_worker(workers);
+ h2_workers *workers = data;
+ h2_slot *slot;
+
+ if (!workers->aborted) {
+ apr_thread_mutex_lock(workers->lock);
+ workers->aborted = 1;
+ /* before we go, cleanup any zombies and abort the rest */
+ cleanup_zombies(workers);
+ for (;;) {
+ slot = pop_slot(&workers->idle);
+ if (slot) {
+ slot->aborted = 1;
+ apr_thread_cond_signal(slot->not_idle);
+ }
+ else {
+ break;
+ }
}
apr_thread_mutex_unlock(workers->lock);
+
+ h2_fifo_term(workers->mplxs);
+ h2_fifo_interrupt(workers->mplxs);
}
- return status;
+ return APR_SUCCESS;
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
int min_workers, int max_workers,
- apr_size_t max_tx_handles)
+ int idle_secs)
{
apr_status_t status;
h2_workers *workers;
apr_pool_t *pool;
+ int i, n;
ap_assert(s);
ap_assert(server_pool);
@@ -254,163 +281,77 @@ h2_workers *h2_workers_create(server_rec
apr_pool_create(&pool, server_pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
- if (workers) {
- workers->s = s;
- workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = 10;
-
- workers->max_tx_handles = max_tx_handles;
- workers->spare_tx_handles = workers->max_tx_handles;
-
- apr_threadattr_create(&workers->thread_attr, workers->pool);
- if (ap_thread_stacksize != 0) {
- apr_threadattr_stacksize_set(workers->thread_attr,
- ap_thread_stacksize);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
- "h2_workers: using stacksize=%ld",
- (long)ap_thread_stacksize);
- }
-
- APR_RING_INIT(&workers->workers, h2_worker, link);
- APR_RING_INIT(&workers->zombies, h2_worker, link);
- APR_RING_INIT(&workers->mplxs, h2_mplx, link);
-
- status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status == APR_SUCCESS) {
- status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
- }
-
- if (status == APR_SUCCESS) {
- status = apr_thread_mutex_create(&workers->tx_lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- }
-
- if (status == APR_SUCCESS) {
- status = h2_workers_start(workers);
- }
-
- if (status != APR_SUCCESS) {
- h2_workers_destroy(workers);
- workers = NULL;
- }
+ if (!workers) {
+ return NULL;
}
- return workers;
-}
-
-void h2_workers_destroy(h2_workers *workers)
-{
- /* before we go, cleanup any zombie workers that may have accumulated */
- cleanup_zombies(workers, 1);
- if (workers->mplx_added) {
- apr_thread_cond_destroy(workers->mplx_added);
- workers->mplx_added = NULL;
- }
- if (workers->lock) {
- apr_thread_mutex_destroy(workers->lock);
- workers->lock = NULL;
+ workers->s = s;
+ workers->pool = pool;
+ workers->min_workers = min_workers;
+ workers->max_workers = max_workers;
+ workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
+
+ status = h2_fifo_create(&workers->mplxs, pool, 2 * workers->max_workers);
+ if (status != APR_SUCCESS) {
+ return NULL;
}
- while (!H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
- h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
- H2_MPLX_REMOVE(m);
- }
- while (!H2_WORKER_LIST_EMPTY(&workers->workers)) {
- h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
- H2_WORKER_REMOVE(w);
+
+ status = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (status != APR_SUCCESS) {
+ return NULL;
}
- if (workers->pool) {
- apr_pool_destroy(workers->pool);
- /* workers is gone */
+
+ if (ap_thread_stacksize != 0) {
+ apr_threadattr_stacksize_set(workers->thread_attr,
+ ap_thread_stacksize);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
+ "h2_workers: using stacksize=%ld",
+ (long)ap_thread_stacksize);
}
-}
-
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
-{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
- "h2_workers: register mplx(%ld), idle=%d",
- m->id, workers->idle_workers);
- if (in_list(workers, m)) {
- status = APR_EAGAIN;
- }
- else {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- status = APR_SUCCESS;
- }
-
- if (workers->idle_workers > 0) {
- apr_thread_cond_signal(workers->mplx_added);
+
+ status = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (status == APR_SUCCESS) {
+ n = workers->nslots = workers->max_workers;
+ workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
+ if (workers->slots == NULL) {
+ workers->nslots = 0;
+ status = APR_ENOMEM;
}
- else if (status == APR_SUCCESS
- && workers->worker_count < workers->max_workers) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: got %d worker, adding 1",
- workers->worker_count);
- add_worker(workers);
+ for (i = 0; i < n; ++i) {
+ workers->slots[i].id = i;
}
- apr_thread_mutex_unlock(workers->lock);
}
- return status;
-}
-
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
-{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
- status = APR_EAGAIN;
- if (in_list(workers, m)) {
- H2_MPLX_REMOVE(m);
- status = APR_SUCCESS;
+ /* we activate all for now, TODO: support min_workers again.
+ * do this in reverse for vanity reasons so slot 0 will most
+ * likely be at head of idle queue. */
+ n = workers->max_workers;
+ for (i = n-1; i >= 0; --i) {
+ status = activate_slot(workers, &workers->slots[i]);
+ }
+ /* the rest of the slots go on the free list */
+ for(i = n; i < workers->nslots; ++i) {
+ push_slot(&workers->free, &workers->slots[i]);
}
- apr_thread_mutex_unlock(workers->lock);
+ workers->dynamic = (workers->worker_count < workers->max_workers);
}
- return status;
-}
-
-void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
-{
- if (idle_secs <= 0) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
- APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
- " is not valid, ignored.", idle_secs);
- return;
+ if (status == APR_SUCCESS) {
+ apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
+ return workers;
}
- workers->max_idle_secs = idle_secs;
+ return NULL;
}
-apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count)
+apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{
- apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
- if (status == APR_SUCCESS) {
- count = H2MIN(workers->spare_tx_handles, count);
- workers->spare_tx_handles -= count;
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
- "h2_workers: reserved %d tx handles, %d/%d left",
- (int)count, (int)workers->spare_tx_handles,
- (int)workers->max_tx_handles);
- apr_thread_mutex_unlock(workers->tx_lock);
- return count;
- }
- return 0;
+ apr_status_t status = h2_fifo_push(workers->mplxs, m);
+ wake_idle_worker(workers);
+ return status;
}
-void h2_workers_tx_free(h2_workers *workers, apr_size_t count)
+apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
{
- apr_status_t status = apr_thread_mutex_lock(workers->tx_lock);
- if (status == APR_SUCCESS) {
- workers->spare_tx_handles += count;
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
- "h2_workers: freed %d tx handles, %d/%d left",
- (int)count, (int)workers->spare_tx_handles,
- (int)workers->max_tx_handles);
- apr_thread_mutex_unlock(workers->tx_lock);
- }
+ return h2_fifo_remove(workers->mplxs, m);
}
-
Modified: httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h?rev=1789739&r1=1789738&r2=1789739&view=diff
==============================================================================
--- httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h (original)
+++ httpd/httpd/branches/2.4.x/modules/http2/h2_workers.h Fri Mar 31 19:41:01 2017
@@ -27,6 +27,9 @@ struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
struct h2_task;
+struct h2_fifo;
+
+struct h2_slot;
typedef struct h2_workers h2_workers;
@@ -41,22 +44,20 @@ struct h2_workers {
int idle_workers;
int max_idle_secs;
- apr_size_t max_tx_handles;
- apr_size_t spare_tx_handles;
-
- unsigned int aborted : 1;
+ int aborted;
+ int dynamic;
apr_threadattr_t *thread_attr;
+ int nslots;
+ struct h2_slot *slots;
- APR_RING_HEAD(h2_worker_list, h2_worker) workers;
- APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
- APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
- int mplx_count;
+ struct h2_slot *free;
+ struct h2_slot *idle;
+ struct h2_slot *zombies;
+
+ struct h2_fifo *mplxs;
struct apr_thread_mutex_t *lock;
- struct apr_thread_cond_t *mplx_added;
-
- struct apr_thread_mutex_t *tx_lock;
};
@@ -64,12 +65,7 @@ struct h2_workers {
* threads.
*/
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
- int min_size, int max_size,
- apr_size_t max_tx_handles);
-
-/* Destroy the worker pool and all its threads.
- */
-void h2_workers_destroy(h2_workers *workers);
+ int min_size, int max_size, int idle_secs);
/**
* Registers a h2_mplx for task scheduling. If this h2_mplx runs
@@ -83,38 +79,4 @@ apr_status_t h2_workers_register(h2_work
*/
apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m);
-/**
- * Set the amount of seconds a h2_worker should wait for new tasks
- * before shutting down (if there are more than the minimum number of
- * workers).
- */
-void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs);
-
-/**
- * Reservation of file handles available for transfer between workers
- * and master connections.
- *
- * When handling output from request processing, file handles are often
- * encountered when static files are served. The most efficient way is then
- * to forward the handle itself to the master connection where it can be
- * read or sendfile'd to the client. But file handles are a scarce resource,
- * so there needs to be a limit on how many handles are transferred this way.
- *
- * h2_workers keeps track of the number of reserved handles and observes a
- * configurable maximum value.
- *
- * @param workers the workers instance
- * @param count how many handles the caller wishes to reserve
- * @return the number of reserved handles, may be 0.
- */
-apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count);
-
-/**
- * Return a number of reserved file handles back to the pool. The number
- * overall may not exceed the numbers reserved.
- * @param workers the workers instance
- * @param count how many handles are returned to the pool
- */
-void h2_workers_tx_free(h2_workers *workers, apr_size_t count);
-
#endif /* defined(__mod_h2__h2_workers__) */