You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2017/03/29 17:36:35 UTC

svn commit: r1789395 - in /httpd/httpd/trunk: CHANGES modules/http2/h2_mplx.c modules/http2/h2_mplx.h modules/http2/h2_session.c modules/http2/h2_stream.c modules/http2/h2_stream.h

Author: icing
Date: Wed Mar 29 17:36:35 2017
New Revision: 1789395

URL: http://svn.apache.org/viewvc?rev=1789395&view=rev
Log:
On the trunk:

mod_http2: better performance, eliminated need for nested locks and thread privates.


Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_stream.c
    httpd/httpd/trunk/modules/http2/h2_stream.h

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Wed Mar 29 17:36:35 2017
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: better performance, eliminated need for nested locks and
+     thread privates. [Stefan Eissing]
+     
   *) core: Disallow multiple Listen on the same IP:port when listener buckets
      are configured (ListenCoresBucketsRatio > 0), consistently with the single
      bucket case (default), thus avoiding the leak of the corresponding socket

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Wed Mar 29 17:36:35 2017
@@ -57,35 +57,40 @@ typedef struct {
 
 /* NULL or the mutex hold by this thread, used for recursive calls
  */
+static const int nested_lock = 0;
+
 static apr_threadkey_t *thread_lock;
 
 apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
 {
-    return apr_threadkey_private_create(&thread_lock, NULL, pool);
+    if (nested_lock) {
+        return apr_threadkey_private_create(&thread_lock, NULL, pool);
+    }
+    return APR_SUCCESS;
 }
 
 static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
 {
     apr_status_t status;
-    void *mutex = NULL;
     
-    /* Enter the mutex if this thread already holds the lock or
-     * if we can acquire it. Only on the later case do we unlock
-     * onleaving the mutex.
-     * This allow recursive entering of the mutex from the saem thread,
-     * which is what we need in certain situations involving callbacks
-     */
-    ap_assert(m);
-    apr_threadkey_private_get(&mutex, thread_lock);
-    if (mutex == m->lock) {
-        *pacquired = 0;
-        return APR_SUCCESS;
+    if (nested_lock) {
+        void *mutex = NULL;
+        /* Enter the mutex if this thread already holds the lock or
+         * if we can acquire it. Only on the later case do we unlock
+         * onleaving the mutex.
+         * This allow recursive entering of the mutex from the saem thread,
+         * which is what we need in certain situations involving callbacks
+         */
+        apr_threadkey_private_get(&mutex, thread_lock);
+        if (mutex == m->lock) {
+            *pacquired = 0;
+            ap_assert(NULL); /* nested, why? */
+            return APR_SUCCESS;
+        }
     }
-
-    ap_assert(m->lock);
     status = apr_thread_mutex_lock(m->lock);
     *pacquired = (status == APR_SUCCESS);
-    if (*pacquired) {
+    if (nested_lock && *pacquired) {
         apr_threadkey_private_set(m->lock, thread_lock);
     }
     return status;
@@ -94,7 +99,9 @@ static apr_status_t enter_mutex(h2_mplx
 static void leave_mutex(h2_mplx *m, int acquired)
 {
     if (acquired) {
-        apr_threadkey_private_set(NULL, thread_lock);
+        if (nested_lock) {
+            apr_threadkey_private_set(NULL, thread_lock);
+        }
         apr_thread_mutex_unlock(m->lock);
     }
 }
@@ -105,38 +112,23 @@ static void stream_output_consumed(void
                                    h2_bucket_beam *beam, apr_off_t length)
 {
     h2_stream *stream = ctx;
-    h2_mplx *m = stream->session->mplx;
     h2_task *task = stream->task;
-    int acquired;
     
     if (length > 0 && task && task->assigned) {
-        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-            h2_req_engine_out_consumed(task->assigned, task->c, length); 
-            leave_mutex(m, acquired);
-        }
+        h2_req_engine_out_consumed(task->assigned, task->c, length); 
     }
 }
 
 static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
 {
-    h2_mplx *m = ctx;
+    h2_stream *stream = ctx;
+    h2_mplx *m = stream->session->mplx;
     apr_atomic_set32(&m->event_pending, 1); 
 }
 
-static void stream_input_consumed(void *ctx, 
-                                  h2_bucket_beam *beam, apr_off_t length)
+static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length)
 {
-    if (length > 0) { 
-        h2_mplx *m = ctx;
-        int acquired;
-    
-        if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-            if (m->input_consumed) {
-                m->input_consumed(m->input_consumed_ctx, beam->id, length);
-            }
-            leave_mutex(m, acquired);
-        }
-    }
+    h2_stream_in_consumed(ctx, length);
 }
 
 static int can_always_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
@@ -289,6 +281,12 @@ static int input_consumed_signal(h2_mplx
     return 0;
 }
 
+static int report_consumption_iter(void *ctx, void *val)
+{
+    input_consumed_signal(ctx, val);
+    return 1;
+}
+
 static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
     if (task->output.beam) {
@@ -426,6 +424,10 @@ static int stream_cancel_iter(void *ctx,
     h2_mplx *m = ctx;
     h2_stream *stream = val;
 
+    /* disabled input consumed reporting */
+    if (stream->input) {
+        h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
+    }
     /* take over event monitoring */
     h2_stream_set_monitor(stream, NULL);
     /* Reset, should transit to CLOSED state */
@@ -527,12 +529,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m
     return s;
 }
 
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
-{
-    m->input_consumed = cb;
-    m->input_consumed_ctx = ctx;
-}
-
 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
 {
     h2_mplx *m = ctx;
@@ -618,18 +614,6 @@ static apr_status_t out_close(h2_mplx *m
     return status;
 }
 
-static int report_input_consumption(void *ctx, void *val)
-{
-    h2_stream *stream = val;
-    
-    (void)ctx;
-    if (stream->input) {
-        h2_beam_report_consumption(stream->input);
-    }
-    return 1;
-}
-
-
 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
                                  apr_thread_cond_t *iowait)
 {
@@ -645,7 +629,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
         }
         else {
             purge_streams(m);
-            h2_ihash_iter(m->streams, report_input_consumption, m);
+            h2_ihash_iter(m->streams, report_consumption_iter, m);
             m->added_output = iowait;
             status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
             if (APLOGctrace2(m->c)) {
@@ -757,7 +741,7 @@ static h2_task *next_stream_task(h2_mplx
                 
                 if (stream->input) {
                     h2_beam_on_consumed(stream->input, stream_input_ev, 
-                                        stream_input_consumed, m);
+                                        stream_input_consumed, stream);
                     h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
                     h2_beam_mutex_enable(stream->input);
                 }
@@ -1207,12 +1191,6 @@ int h2_mplx_has_master_events(h2_mplx *m
     return apr_atomic_read32(&m->event_pending) > 0;
 }
 
-static int report_consumption_iter(void *ctx, void *val)
-{
-    input_consumed_signal(ctx, val);
-    return 1;
-}
-
 apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
                                             stream_ev_callback *on_resume, 
                                             void *on_ctx)
@@ -1227,16 +1205,19 @@ apr_status_t h2_mplx_dispatch_master_eve
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
                       "h2_mplx(%ld): dispatch events", m->id);        
         apr_atomic_set32(&m->event_pending, 0);
+        purge_streams(m);
+        
         /* update input windows for streams */
         h2_ihash_iter(m->streams, report_consumption_iter, m);
-        purge_streams(m);
         
         if (!h2_iq_empty(m->readyq)) {
             n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
             for (i = 0; i < n; ++i) {
                 stream = h2_ihash_get(m->streams, ids[i]);
                 if (stream) {
+                    leave_mutex(m, acquired);
                     on_resume(on_ctx, stream);
+                    enter_mutex(m, &acquired);
                 }
             }
         }

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Wed Mar 29 17:36:35 2017
@@ -53,12 +53,6 @@ struct h2_req_engine;
 
 typedef struct h2_mplx h2_mplx;
 
-/**
- * Callback invoked for every stream that had input data read since
- * the last invocation.
- */
-typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
-
 struct h2_mplx {
     long id;
     conn_rec *c;
@@ -100,9 +94,6 @@ struct h2_mplx {
     
     struct h2_workers *workers;
     
-    h2_mplx_consumed_cb *input_consumed;
-    void *input_consumed_ctx;
-
     struct h2_ngn_shed *ngn_shed;
 };
 
@@ -194,18 +185,6 @@ apr_status_t h2_mplx_process(h2_mplx *m,
  */
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
 
-/**
- * Register a callback for the amount of input data consumed per stream. The
- * will only ever be invoked from the thread creating this h2_mplx, e.g. when
- * calls from that thread into this h2_mplx are made.
- *
- * @param m the multiplexer to register the callback at
- * @param cb the function to invoke
- * @param ctx user supplied argument to invocation.
- */
-void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
-
-
 typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);
 
 /**

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Wed Mar 29 17:36:35 2017
@@ -77,68 +77,6 @@ static h2_stream *get_stream(h2_session
     return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
 }
 
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
-{
-    h2_session *session = ctx;
-    h2_stream *stream;
-    
-    if (bytes_read > 0) {
-        apr_off_t consumed = bytes_read;
-        
-        while (consumed > 0) {
-            int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
-            nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
-            consumed -= len;
-        }
-
-        (void)stream;
-#ifdef H2_NG2_LOCAL_WIN_SIZE
-        if ((stream = get_stream(session, stream_id))) {
-            int cur_size = nghttp2_session_get_stream_local_window_size(
-                session->ngh2, stream->id);
-            int win = stream->in_window_size;
-            int thigh = win * 8/10;
-            int tlow = win * 2/10;
-            const int win_max = 2*1024*1024;
-            const int win_min = 32*1024;
-            
-            /* Work in progress, probably shoud add directives for these
-             * values once this stabilizes somewhat. The general idea is
-             * to adapt stream window sizes if the input window changes
-             * a) very quickly (< good RTT) from full to empty
-             * b) only a little bit (> bad RTT)
-             * where in a) it grows and in b) it shrinks again.
-             */
-            if (cur_size > thigh && bytes_read > thigh && win < win_max) {
-                /* almost empty again with one reported consumption, how
-                 * long did this take? */
-                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
-                if (ms < 40) {
-                    win = H2MIN(win_max, win + (64*1024));
-                }
-            }
-            else if (cur_size < tlow && bytes_read < tlow && win > win_min) {
-                /* staying full, for how long already? */
-                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
-                if (ms > 700) {
-                    win = H2MAX(win_min, win - (32*1024));
-                }
-            }
-            
-            if (win != stream->in_window_size) {
-                stream->in_window_size = win;
-                nghttp2_session_set_local_window_size(session->ngh2, 
-                        NGHTTP2_FLAG_NONE, stream_id, win);
-            } 
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                          "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
-                          session->id, stream_id, (long)bytes_read, 
-                          cur_size, stream->in_window_size);
-        }
-#endif
-    }
-}
-
 static void dispatch_event(h2_session *session, h2_session_event_t ev, 
                              int err, const char *msg);
 
@@ -755,7 +693,6 @@ static apr_status_t session_cleanup(h2_s
     }
 
     transit(session, trigger, H2_SESSION_ST_CLEANUP);
-    h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
     h2_mplx_release_and_join(session->mplx, session->iowait);
     session->mplx = NULL;
 
@@ -880,8 +817,6 @@ static apr_status_t h2_session_create_in
     session->mplx = h2_mplx_create(c, session->pool, session->config, 
                                    workers);
     
-    h2_mplx_set_consumed_cb(session->mplx, update_window, session);
-    
     /* connection input filter that feeds the session */
     session->cin = h2_filter_cin_create(session);
     ap_add_input_filter("H2_IN", session->cin, r, c);

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Wed Mar 29 17:36:35 2017
@@ -969,4 +969,64 @@ int h2_stream_was_closed(const h2_stream
     }
 }
 
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
+{
+    h2_session *session = stream->session;
+    
+    if (amount > 0) {
+        apr_off_t consumed = amount;
+        
+        while (consumed > 0) {
+            int len = (consumed > INT_MAX)? INT_MAX : consumed;
+            nghttp2_session_consume(session->ngh2, stream->id, len);
+            consumed -= len;
+        }
+
+#ifdef H2_NG2_LOCAL_WIN_SIZE
+        if (1) {
+            int cur_size = nghttp2_session_get_stream_local_window_size(
+                session->ngh2, stream->id);
+            int win = stream->in_window_size;
+            int thigh = win * 8/10;
+            int tlow = win * 2/10;
+            const int win_max = 2*1024*1024;
+            const int win_min = 32*1024;
+            
+            /* Work in progress, probably should add directives for these
+             * values once this stabilizes somewhat. The general idea is
+             * to adapt stream window sizes if the input window changes
+             * a) very quickly (< good RTT) from full to empty
+             * b) only a little bit (> bad RTT)
+             * where in a) it grows and in b) it shrinks again.
+             */
+            if (cur_size > thigh && amount > thigh && win < win_max) {
+                /* almost empty again with one reported consumption, how
+                 * long did this take? */
+                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+                if (ms < 40) {
+                    win = H2MIN(win_max, win + (64*1024));
+                }
+            }
+            else if (cur_size < tlow && amount < tlow && win > win_min) {
+                /* staying full, for how long already? */
+                long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
+                if (ms > 700) {
+                    win = H2MAX(win_min, win - (32*1024));
+                }
+            }
+            
+            if (win != stream->in_window_size) {
+                stream->in_window_size = win;
+                nghttp2_session_set_local_window_size(session->ngh2, 
+                        NGHTTP2_FLAG_NONE, stream->id, win);
+            } 
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                          "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
+                          session->id, stream->id, (long)amount, 
+                          cur_size, stream->in_window_size);
+        }
+#endif
+    }
+    return APR_SUCCESS;   
+}
 

Modified: httpd/httpd/trunk/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.h?rev=1789395&r1=1789394&r2=1789395&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.h Wed Mar 29 17:36:35 2017
@@ -165,6 +165,12 @@ void h2_stream_cleanup(h2_stream *stream
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
 
 /**
+ * Notify the stream that amount bytes have been consumed of its input
+ * since the last invocation of this method (delta amount).
+ */
+apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount);
+
+/**
  * Set complete stream headers from given h2_request.
  * 
  * @param stream stream to write request to