You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2017/03/29 17:36:35 UTC
svn commit: r1789395 - in /httpd/httpd/trunk: CHANGES
modules/http2/h2_mplx.c modules/http2/h2_mplx.h modules/http2/h2_session.c
modules/http2/h2_stream.c modules/http2/h2_stream.h
Author: icing
Date: Wed Mar 29 17:36:35 2017
New Revision: 1789395
URL: http://svn.apache.org/viewvc?rev=1789395&view=rev
Log:
On the trunk:
mod_http2: better performance, eliminated need for nested locks and thread privates.
Modified:
httpd/httpd/trunk/CHANGES
httpd/httpd/trunk/modules/http2/h2_mplx.c
httpd/httpd/trunk/modules/http2/h2_mplx.h
httpd/httpd/trunk/modules/http2/h2_session.c
httpd/httpd/trunk/modules/http2/h2_stream.c
httpd/httpd/trunk/modules/http2/h2_stream.h
Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Wed Mar 29 17:36:35 2017
@@ -1,6 +1,9 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: better performance, eliminated need for nested locks and
+ thread privates. [Stefan Eissing]
+
*) core: Disallow multiple Listen on the same IP:port when listener buckets
are configured (ListenCoresBucketsRatio > 0), consistently with the single
bucket case (default), thus avoiding the leak of the corresponding socket
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Wed Mar 29 17:36:35 2017
@@ -57,35 +57,40 @@ typedef struct {
/* NULL or the mutex hold by this thread, used for recursive calls
*/
+static const int nested_lock = 0;
+
static apr_threadkey_t *thread_lock;
apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
{
- return apr_threadkey_private_create(&thread_lock, NULL, pool);
+ if (nested_lock) {
+ return apr_threadkey_private_create(&thread_lock, NULL, pool);
+ }
+ return APR_SUCCESS;
}
static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
{
apr_status_t status;
- void *mutex = NULL;
- /* Enter the mutex if this thread already holds the lock or
- * if we can acquire it. Only on the later case do we unlock
- * onleaving the mutex.
- * This allow recursive entering of the mutex from the saem thread,
- * which is what we need in certain situations involving callbacks
- */
- ap_assert(m);
- apr_threadkey_private_get(&mutex, thread_lock);
- if (mutex == m->lock) {
- *pacquired = 0;
- return APR_SUCCESS;
+ if (nested_lock) {
+ void *mutex = NULL;
+ /* Enter the mutex if this thread already holds the lock or
+ * if we can acquire it. Only on the later case do we unlock
+ * onleaving the mutex.
+ * This allow recursive entering of the mutex from the saem thread,
+ * which is what we need in certain situations involving callbacks
+ */
+ apr_threadkey_private_get(&mutex, thread_lock);
+ if (mutex == m->lock) {
+ *pacquired = 0;
+ ap_assert(NULL); /* nested, why? */
+ return APR_SUCCESS;
+ }
}
-
- ap_assert(m->lock);
status = apr_thread_mutex_lock(m->lock);
*pacquired = (status == APR_SUCCESS);
- if (*pacquired) {
+ if (nested_lock && *pacquired) {
apr_threadkey_private_set(m->lock, thread_lock);
}
return status;
@@ -94,7 +99,9 @@ static apr_status_t enter_mutex(h2_mplx
static void leave_mutex(h2_mplx *m, int acquired)
{
if (acquired) {
- apr_threadkey_private_set(NULL, thread_lock);
+ if (nested_lock) {
+ apr_threadkey_private_set(NULL, thread_lock);
+ }
apr_thread_mutex_unlock(m->lock);
}
}
@@ -105,38 +112,23 @@ static void stream_output_consumed(void
h2_bucket_beam *beam, apr_off_t length)
{
h2_stream *stream = ctx;
- h2_mplx *m = stream->session->mplx;
h2_task *task = stream->task;
- int acquired;
if (length > 0 && task && task->assigned) {
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- h2_req_engine_out_consumed(task->assigned, task->c, length);
- leave_mutex(m, acquired);
- }
+ h2_req_engine_out_consumed(task->assigned, task->c, length);
}
}
static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
{
- h2_mplx *m = ctx;
+ h2_stream *stream = ctx;
+ h2_mplx *m = stream->session->mplx;
apr_atomic_set32(&m->event_pending, 1);
}
-static void stream_input_consumed(void *ctx,
- h2_bucket_beam *beam, apr_off_t length)
+static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
{
- if (length > 0) {
- h2_mplx *m = ctx;
- int acquired;
-
- if (enter_mutex(m, &acquired) == APR_SUCCESS) {
- if (m->input_consumed) {
- m->input_consumed(m->input_consumed_ctx, beam->id, length);
- }
- leave_mutex(m, acquired);
- }
- }
+ h2_stream_in_consumed(ctx, length);
}
static int can_always_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
@@ -289,6 +281,12 @@ static int input_consumed_signal(h2_mplx
return 0;
}
+static int report_consumption_iter(void *ctx, void *val)
+{
+ input_consumed_signal(ctx, val);
+ return 1;
+}
+
static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
if (task->output.beam) {
@@ -426,6 +424,10 @@ static int stream_cancel_iter(void *ctx,
h2_mplx *m = ctx;
h2_stream *stream = val;
+ /* disabled input consumed reporting */
+ if (stream->input) {
+ h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+ }
/* take over event monitoring */
h2_stream_set_monitor(stream, NULL);
/* Reset, should transit to CLOSED state */
@@ -527,12 +529,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m
return s;
}
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
- m->input_consumed = cb;
- m->input_consumed_ctx = ctx;
-}
-
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
h2_mplx *m = ctx;
@@ -618,18 +614,6 @@ static apr_status_t out_close(h2_mplx *m
return status;
}
-static int report_input_consumption(void *ctx, void *val)
-{
- h2_stream *stream = val;
-
- (void)ctx;
- if (stream->input) {
- h2_beam_report_consumption(stream->input);
- }
- return 1;
-}
-
-
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
apr_thread_cond_t *iowait)
{
@@ -645,7 +629,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
}
else {
purge_streams(m);
- h2_ihash_iter(m->streams, report_input_consumption, m);
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
if (APLOGctrace2(m->c)) {
@@ -757,7 +741,7 @@ static h2_task *next_stream_task(h2_mplx
if (stream->input) {
h2_beam_on_consumed(stream->input, stream_input_ev,
- stream_input_consumed, m);
+ stream_input_consumed, stream);
h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
h2_beam_mutex_enable(stream->input);
}
@@ -1207,12 +1191,6 @@ int h2_mplx_has_master_events(h2_mplx *m
return apr_atomic_read32(&m->event_pending) > 0;
}
-static int report_consumption_iter(void *ctx, void *val)
-{
- input_consumed_signal(ctx, val);
- return 1;
-}
-
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
void *on_ctx)
@@ -1227,16 +1205,19 @@ apr_status_t h2_mplx_dispatch_master_eve
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
apr_atomic_set32(&m->event_pending, 0);
+ purge_streams(m);
+
/* update input windows for streams */
h2_ihash_iter(m->streams, report_consumption_iter, m);
- purge_streams(m);
if (!h2_iq_empty(m->readyq)) {
n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
for (i = 0; i < n; ++i) {
stream = h2_ihash_get(m->streams, ids[i]);
if (stream) {
+ leave_mutex(m, acquired);
on_resume(on_ctx, stream);
+ enter_mutex(m, &acquired);
}
}
}
Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Wed Mar 29 17:36:35 2017
@@ -53,12 +53,6 @@ struct h2_req_engine;
typedef struct h2_mplx h2_mplx;
-/**
- * Callback invoked for every stream that had input data read since
- * the last invocation.
- */
-typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
-
struct h2_mplx {
long id;
conn_rec *c;
@@ -100,9 +94,6 @@ struct h2_mplx {
struct h2_workers *workers;
- h2_mplx_consumed_cb *input_consumed;
- void *input_consumed_ctx;
-
struct h2_ngn_shed *ngn_shed;
};
@@ -194,18 +185,6 @@ apr_status_t h2_mplx_process(h2_mplx *m,
*/
apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
-/**
- * Register a callback for the amount of input data consumed per stream. The
- * will only ever be invoked from the thread creating this h2_mplx, e.g. when
- * calls from that thread into this h2_mplx are made.
- *
- * @param m the multiplexer to register the callback at
- * @param cb the function to invoke
- * @param ctx user supplied argument to invocation.
- */
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
-
-
typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);
/**
Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Wed Mar 29 17:36:35 2017
@@ -77,68 +77,6 @@ static h2_stream *get_stream(h2_session
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
-{
- h2_session *session = ctx;
- h2_stream *stream;
-
- if (bytes_read > 0) {
- apr_off_t consumed = bytes_read;
-
- while (consumed > 0) {
- int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
- nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
- consumed -= len;
- }
-
- (void)stream;
-#ifdef H2_NG2_LOCAL_WIN_SIZE
- if ((stream = get_stream(session, stream_id))) {
- int cur_size = nghttp2_session_get_stream_local_window_size(
- session->ngh2, stream->id);
- int win = stream->in_window_size;
- int thigh = win * 8/10;
- int tlow = win * 2/10;
- const int win_max = 2*1024*1024;
- const int win_min = 32*1024;
-
- /* Work in progress, probably shoud add directives for these
- * values once this stabilizes somewhat. The general idea is
- * to adapt stream window sizes if the input window changes
- * a) very quickly (< good RTT) from full to empty
- * b) only a little bit (> bad RTT)
- * where in a) it grows and in b) it shrinks again.
- */
- if (cur_size > thigh && bytes_read > thigh && win < win_max) {
- /* almost empty again with one reported consumption, how
- * long did this take? */
- long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
- if (ms < 40) {
- win = H2MIN(win_max, win + (64*1024));
- }
- }
- else if (cur_size < tlow && bytes_read < tlow && win > win_min) {
- /* staying full, for how long already? */
- long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
- if (ms > 700) {
- win = H2MAX(win_min, win - (32*1024));
- }
- }
-
- if (win != stream->in_window_size) {
- stream->in_window_size = win;
- nghttp2_session_set_local_window_size(session->ngh2,
- NGHTTP2_FLAG_NONE, stream_id, win);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
- session->id, stream_id, (long)bytes_read,
- cur_size, stream->in_window_size);
- }
-#endif
- }
-}
-
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
@@ -755,7 +693,6 @@ static apr_status_t session_cleanup(h2_s
}
transit(session, trigger, H2_SESSION_ST_CLEANUP);
- h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
@@ -880,8 +817,6 @@ static apr_status_t h2_session_create_in
session->mplx = h2_mplx_create(c, session->pool, session->config,
workers);
- h2_mplx_set_consumed_cb(session->mplx, update_window, session);
-
/* connection input filter that feeds the session */
session->cin = h2_filter_cin_create(session);
ap_add_input_filter("H2_IN", session->cin, r, c);
Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Wed Mar 29 17:36:35 2017
@@ -969,4 +969,64 @@ int h2_stream_was_closed(const h2_stream
}
}
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
+{
+ h2_session *session = stream->session;
+
+ if (amount > 0) {
+ apr_off_t consumed = amount;
+
+ while (consumed > 0) {
+ int len = (consumed > INT_MAX)? INT_MAX : consumed;
+ nghttp2_session_consume(session->ngh2, stream->id, len);
+ consumed -= len;
+ }
+
+#ifdef H2_NG2_LOCAL_WIN_SIZE
+ if (1) {
+ int cur_size = nghttp2_session_get_stream_local_window_size(
+ session->ngh2, stream->id);
+ int win = stream->in_window_size;
+ int thigh = win * 8/10;
+ int tlow = win * 2/10;
+ const int win_max = 2*1024*1024;
+ const int win_min = 32*1024;
+
+ /* Work in progress, probably should add directives for these
+ * values once this stabilizes somewhat. The general idea is
+ * to adapt stream window sizes if the input window changes
+ * a) very quickly (< good RTT) from full to empty
+ * b) only a little bit (> bad RTT)
+ * where in a) it grows and in b) it shrinks again.
+ */
+ if (cur_size > thigh && amount > thigh && win < win_max) {
+ /* almost empty again with one reported consumption, how
+ * long did this take? */
+ long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+ if (ms < 40) {
+ win = H2MIN(win_max, win + (64*1024));
+ }
+ }
+ else if (cur_size < tlow && amount < tlow && win > win_min) {
+ /* staying full, for how long already? */
+ long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+ if (ms > 700) {
+ win = H2MAX(win_min, win - (32*1024));
+ }
+ }
+
+ if (win != stream->in_window_size) {
+ stream->in_window_size = win;
+ nghttp2_session_set_local_window_size(session->ngh2,
+ NGHTTP2_FLAG_NONE, stream->id, win);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
+ session->id, stream->id, (long)amount,
+ cur_size, stream->in_window_size);
+ }
+#endif
+ }
+ return APR_SUCCESS;
+}
Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Wed Mar 29 17:36:35 2017
@@ -165,6 +165,12 @@ void h2_stream_cleanup(h2_stream *stream
apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
/**
+ * Notify the stream that amount bytes have been consumed of its input
+ * since the last invocation of this method (delta amount).
+ */
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount);
+
+/**
* Set complete stream headers from given h2_request.
*
* @param stream stream to write request to