You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2017/01/08 20:11:27 UTC

svn commit: r1777907 - in /httpd/httpd/trunk: ./ modules/http2/

Author: icing
Date: Sun Jan  8 20:11:27 2017
New Revision: 1777907

URL: http://svn.apache.org/viewvc?rev=1777907&view=rev
Log:
On the trunk:

  *) mod_http2: streaming of request output now reacts timely to data
     from other streams becoming available. Same for new incoming requests.


Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
    httpd/httpd/trunk/modules/http2/h2_config.c
    httpd/httpd/trunk/modules/http2/h2_config.h
    httpd/httpd/trunk/modules/http2/h2_conn_io.c
    httpd/httpd/trunk/modules/http2/h2_conn_io.h
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_stream.c
    httpd/httpd/trunk/modules/http2/h2_version.h

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Sun Jan  8 20:11:27 2017
@@ -1,15 +1,13 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: streaming of request output now reacts timely to data
+     from other streams becoming available. Same for new incoming requests.
+     [Stefan Eissing]
+     
   *) core: EBCDIC fixes for interim responses with additional headers.
      [Eric Covener]
 
-  *) mod_http2: fix for possible page fault when stream is resumed during 
-     session shutdown. [sidney-j-r-m (github)]
-     
-  *) mod_http2: fix for h2 session ignoring new responses while already
-     open streams continue to have data available. [Stefan Eissing]
-     
   *) mod_remoteip: Add support for PROXY protocol (code donated by Cloudzilla).
      Add ability for PROXY protocol processing to be optional to donated code.
      See also: http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.c Sun Jan  8 20:11:27 2017
@@ -14,6 +14,7 @@
  */
 
 #include <apr_lib.h>
+#include <apr_atomic.h>
 #include <apr_strings.h>
 #include <apr_time.h>
 #include <apr_buckets.h>
@@ -243,25 +244,29 @@ static void leave_yellow(h2_bucket_beam
     }
 }
 
-static void report_consumption(h2_bucket_beam *beam, int force)
+static int report_consumption(h2_bucket_beam *beam)
 {
-    if (force || beam->received_bytes != beam->reported_consumed_bytes) {
-        if (beam->consumed_fn) { 
-            beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
-                              - beam->reported_consumed_bytes);
+    int rv = 0;
+    if (apr_atomic_read32(&beam->cons_ev_pending)) {
+        if (beam->cons_io_cb) { 
+            beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
+                             - beam->cons_bytes_reported);
+            rv = 1;
         }
-        beam->reported_consumed_bytes = beam->received_bytes;
+        beam->cons_bytes_reported = beam->received_bytes;
+        apr_atomic_set32(&beam->cons_ev_pending, 0);
     }
+    return rv;
 }
 
-static void report_production(h2_bucket_beam *beam, int force)
+static void report_prod_io(h2_bucket_beam *beam, int force)
 {
-    if (force || beam->sent_bytes != beam->reported_produced_bytes) {
-        if (beam->produced_fn) { 
-            beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
-                              - beam->reported_produced_bytes);
+    if (force || beam->prod_bytes_reported != beam->sent_bytes) {
+        if (beam->prod_io_cb) { 
+            beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
+                             - beam->prod_bytes_reported);
         }
-        beam->reported_produced_bytes = beam->sent_bytes;
+        beam->prod_bytes_reported = beam->sent_bytes;
     }
 }
 
@@ -322,7 +327,7 @@ static apr_status_t r_wait_space(h2_buck
     while (!beam->aborted && *premain <= 0 
            && (block == APR_BLOCK_READ) && pbl->mutex) {
         apr_status_t status;
-        report_production(beam, 1);
+        report_prod_io(beam, 1);
         status = wait_cond(beam, pbl->mutex);
         if (APR_STATUS_IS_TIMEUP(status)) {
             return status;
@@ -453,7 +458,7 @@ static apr_status_t beam_send_cleanup(vo
     /* sender has gone away, clear up all references to its memory */
     r_purge_sent(beam);
     h2_blist_cleanup(&beam->send_list);
-    report_consumption(beam, 0);
+    report_consumption(beam);
     while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
         h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
         H2_BPROXY_REMOVE(proxy);
@@ -634,7 +639,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
             beam->aborted = 1;
             r_purge_sent(beam);
             h2_blist_cleanup(&beam->send_list);
-            report_consumption(beam, 0);
+            report_consumption(beam);
         }
         if (beam->m_cond) {
             apr_thread_cond_broadcast(beam->m_cond);
@@ -650,7 +655,7 @@ apr_status_t h2_beam_close(h2_bucket_bea
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_sent(beam);
         beam_close(beam);
-        report_consumption(beam, 0);
+        report_consumption(beam);
         leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
@@ -851,12 +856,12 @@ apr_status_t h2_beam_send(h2_bucket_beam
                 b = APR_BRIGADE_FIRST(red_brigade);
                 status = append_bucket(beam, b, block, &bl);
             }
-            report_production(beam, force_report);
+            report_prod_io(beam, force_report);
             if (beam->m_cond) {
                 apr_thread_cond_broadcast(beam->m_cond);
             }
         }
-        report_consumption(beam, 0);
+        report_consumption(beam);
         leave_yellow(beam, &bl);
     }
     return status;
@@ -872,6 +877,7 @@ apr_status_t h2_beam_receive(h2_bucket_b
     int transferred = 0;
     apr_status_t status = APR_SUCCESS;
     apr_off_t remain = readbytes;
+    int transferred_buckets = 0;
     
     /* Called from the green thread to take buckets from the beam */
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
@@ -968,6 +974,8 @@ transfer:
             APR_BUCKET_REMOVE(bred);
             H2_BLIST_INSERT_TAIL(&beam->hold_list, bred);
             beam->received_bytes += bred->length;
+            ++transferred_buckets;
+            
             if (bgreen) {
                 APR_BRIGADE_INSERT_TAIL(bb, bgreen);
                 remain -= bgreen->length;
@@ -1000,6 +1008,13 @@ transfer:
             }
         }
 
+        if (transferred_buckets > 0) {
+           apr_atomic_set32(&beam->cons_ev_pending, 1);
+           if (beam->cons_ev_cb) { 
+               beam->cons_ev_cb(beam->cons_ctx, beam);
+            }
+        }
+        
         if (beam->closed 
             && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
             && H2_BLIST_EMPTY(&beam->send_list)) {
@@ -1042,25 +1057,25 @@ leave:
 }
 
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_io_callback *cb, void *ctx)
+                         h2_beam_ev_callback *ev_cb,
+                         h2_beam_io_callback *io_cb, void *ctx)
 {
     h2_beam_lock bl;
-    
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        beam->consumed_fn = cb;
-        beam->consumed_ctx = ctx;
+        beam->cons_ev_cb = ev_cb;
+        beam->cons_io_cb = io_cb;
+        beam->cons_ctx = ctx;
         leave_yellow(beam, &bl);
     }
 }
 
 void h2_beam_on_produced(h2_bucket_beam *beam, 
-                         h2_beam_io_callback *cb, void *ctx)
+                         h2_beam_io_callback *io_cb, void *ctx)
 {
     h2_beam_lock bl;
-    
     if (enter_yellow(beam, &bl) == APR_SUCCESS) {
-        beam->produced_fn = cb;
-        beam->produced_ctx = ctx;
+        beam->prod_io_cb = io_cb;
+        beam->prod_ctx = ctx;
         leave_yellow(beam, &bl);
     }
 }
@@ -1173,3 +1188,16 @@ int h2_beam_no_files(void *ctx, h2_bucke
     return 0;
 }
 
+int h2_beam_report_consumption(h2_bucket_beam *beam)
+{
+    if (apr_atomic_read32(&beam->cons_ev_pending)) {
+        h2_beam_lock bl;
+        if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+            int rv = report_consumption(beam);
+            leave_yellow(beam, &bl);
+            return rv;
+        }
+    }
+    return 0;
+}
+

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.h?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.h Sun Jan  8 20:11:27 2017
@@ -154,6 +154,7 @@ typedef apr_status_t h2_beam_mutex_enter
 
 typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
                                  apr_off_t bytes);
+typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);
 
 typedef struct h2_beam_proxy h2_beam_proxy;
 typedef struct {
@@ -205,12 +206,17 @@ struct h2_bucket_beam {
     h2_beam_mutex_enter *m_enter;
     struct apr_thread_cond_t *m_cond;
     
-    apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
-    h2_beam_io_callback *consumed_fn;
-    void *consumed_ctx;
-    apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
-    h2_beam_io_callback *produced_fn;
-    void *produced_ctx;
+    apr_uint32_t cons_ev_pending;     /* != 0, consumer event pending */
+    apr_off_t cons_bytes_reported;    /* amount of bytes reported as consumed */
+    h2_beam_ev_callback *cons_ev_cb;
+    h2_beam_io_callback *cons_io_cb;
+    void *cons_ctx;
+
+    apr_uint32_t prod_ev_pending;     /* != 0, producer event pending */
+    apr_off_t prod_bytes_reported;    /* amount of bytes reported as produced */
+    h2_beam_io_callback *prod_io_cb;
+    void *prod_ctx;
+
     h2_beam_can_beam_callback *can_beam_fn;
     void *can_beam_ctx;
 };
@@ -336,26 +342,38 @@ apr_size_t h2_beam_buffer_size_get(h2_bu
  * amount of bytes that have been consumed by the receiver, since the
  * last callback invocation or reset.
  * @param beam the beam to set the callback on
- * @param cb   the callback or NULL
+ * @param ev_cb the callback or NULL, called when bytes are consumed
+ * @param io_cb the callback or NULL, called on sender with bytes consumed
  * @param ctx  the context to use in callback invocation
  * 
- * Call from the sender side, callbacks invoked on sender side.
+ * Call from the sender side, io callbacks invoked on sender side, ev callback
+ * from any side.
  */
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
-                         h2_beam_io_callback *cb, void *ctx);
+                         h2_beam_ev_callback *ev_cb,
+                         h2_beam_io_callback *io_cb, void *ctx);
+
+/**
+ * Call any registered consumed handler, if any changes have happened
+ * since the last invocation. 
+ * @return !=0 iff a handler has been called
+ *
+ * Needs to be invoked from the sending side.
+ */
+int h2_beam_report_consumption(h2_bucket_beam *beam);
 
 /**
  * Register a callback to be invoked on the receiver side with the
  * amount of bytes that have been produces by the sender, since the
  * last callback invocation or reset.
  * @param beam the beam to set the callback on
- * @param cb   the callback or NULL
+ * @param io_cb the callback or NULL, called on receiver with bytes produced
  * @param ctx  the context to use in callback invocation
  * 
- * Call from the receiver side, callbacks invoked on receiver side.
+ * Call from the receiver side, callbacks invoked on either side.
  */
 void h2_beam_on_produced(h2_bucket_beam *beam, 
-                         h2_beam_io_callback *cb, void *ctx);
+                         h2_beam_io_callback *io_cb, void *ctx);
 
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx);

Modified: httpd/httpd/trunk/modules/http2/h2_config.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_config.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_config.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_config.c Sun Jan  8 20:11:27 2017
@@ -64,7 +64,6 @@ static h2_config defconf = {
     0,                      /* copy files across threads */
     NULL,                   /* push list */
     0,                      /* early hints, http status 103 */
-    2,                      /* TLS records flush count */
 };
 
 void h2_config_init(apr_pool_t *pool)
@@ -100,7 +99,6 @@ static void *h2_config_create(apr_pool_t
     conf->copy_files           = DEF_VAL;
     conf->push_list            = NULL;
     conf->early_hints          = DEF_VAL;
-    conf->tls_flush_count      = DEF_VAL;
     return conf;
 }
 
@@ -153,7 +151,6 @@ static void *h2_config_merge(apr_pool_t
         n->push_list        = add->push_list? add->push_list : base->push_list;
     }
     n->early_hints          = H2_CONFIG_GET(add, base, early_hints);
-    n->tls_flush_count      = H2_CONFIG_GET(add, base, tls_flush_count);
     return n;
 }
 
@@ -211,8 +208,6 @@ apr_int64_t h2_config_geti64(const h2_co
             return H2_CONFIG_GET(conf, &defconf, copy_files);
         case H2_CONF_EARLY_HINTS:
             return H2_CONFIG_GET(conf, &defconf, early_hints);
-        case H2_CONF_TLS_FLUSH_COUNT:
-            return H2_CONFIG_GET(conf, &defconf, tls_flush_count);
         default:
             return DEF_VAL;
     }
@@ -510,15 +505,6 @@ static const char *h2_conf_set_tls_coold
     return NULL;
 }
 
-static const char *h2_conf_set_tls_flush_count(cmd_parms *parms,
-                                               void *arg, const char *value)
-{
-    h2_config *cfg = (h2_config *)h2_config_sget(parms->server);
-    cfg->tls_flush_count = (int)apr_atoi64(value);
-    (void)arg;
-    return NULL;
-}
-
 static const char *h2_conf_set_push_diary_size(cmd_parms *parms,
                                                void *arg, const char *value)
 {
@@ -657,8 +643,6 @@ const command_rec h2_cmds[] = {
                   RSRC_CONF, "number of bytes on TLS connection before doing max writes"),
     AP_INIT_TAKE1("H2TLSCoolDownSecs", h2_conf_set_tls_cooldown_secs, NULL,
                   RSRC_CONF, "seconds of idle time on TLS before shrinking writes"),
-    AP_INIT_TAKE1("H2TLSFlushCount", h2_conf_set_tls_flush_count, NULL,
-                  RSRC_CONF, "number of max TLS records before output is flushed"),
     AP_INIT_TAKE1("H2Push", h2_conf_set_push, NULL,
                   RSRC_CONF, "off to disable HTTP/2 server push"),
     AP_INIT_TAKE23("H2PushPriority", h2_conf_add_push_priority, NULL,

Modified: httpd/httpd/trunk/modules/http2/h2_config.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_config.h?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_config.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_config.h Sun Jan  8 20:11:27 2017
@@ -42,7 +42,6 @@ typedef enum {
     H2_CONF_PUSH_DIARY_SIZE,
     H2_CONF_COPY_FILES,
     H2_CONF_EARLY_HINTS,
-    H2_CONF_TLS_FLUSH_COUNT,
 } h2_config_var_t;
 
 struct apr_hash_t;
@@ -80,7 +79,6 @@ typedef struct h2_config {
     int copy_files;               /* if files shall be copied vs setaside on output */
     apr_array_header_t *push_list;/* list of h2_push_res configurations */
     int early_hints;              /* support status code 103 */
-    int tls_flush_count;          /* max # of TLS records until output flushed */
 } h2_config;
 
 

Modified: httpd/httpd/trunk/modules/http2/h2_conn_io.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn_io.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn_io.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn_io.c Sun Jan  8 20:11:27 2017
@@ -132,9 +132,7 @@ apr_status_t h2_conn_io_init(h2_conn_io
     io->output         = apr_brigade_create(c->pool, c->bucket_alloc);
     io->is_tls         = h2_h2_is_tls(c);
     io->buffer_output  = io->is_tls;
-    io->pass_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM);
-    io->flush_factor   = h2_config_geti(cfg, H2_CONF_TLS_FLUSH_COUNT);
-    io->speed_factor   = 1.0;
+    io->flush_threshold = (apr_size_t)h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM);
 
     if (io->is_tls) {
         /* This is what we start with, 
@@ -257,7 +255,6 @@ static void check_write_size(h2_conn_io
         /* long time not written, reset write size */
         io->write_size = WRITE_SIZE_INITIAL;
         io->bytes_written = 0;
-        io->speed_factor = 1.0;
         ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
                       "h2_conn_io(%ld): timeout write size reset to %ld", 
                       (long)io->c->id, (long)io->write_size);
@@ -282,7 +279,7 @@ static apr_status_t pass_output(h2_conn_
     apr_status_t status;
     
     append_scratch(io);
-    if (flush) {
+    if (flush && !io->is_flushed) {
         b = apr_bucket_flush_create(c->bucket_alloc);
         APR_BRIGADE_INSERT_TAIL(bb, b);
     }
@@ -300,6 +297,9 @@ static apr_status_t pass_output(h2_conn_
     if (status == APR_SUCCESS) {
         io->bytes_written += (apr_size_t)bblen;
         io->last_write = apr_time_now();
+        if (flush) {
+            io->is_flushed = 1;
+        }
     }
     apr_brigade_cleanup(bb);
 
@@ -325,35 +325,26 @@ static apr_status_t pass_output(h2_conn_
     return status;
 }
 
+int h2_conn_io_needs_flush(h2_conn_io *io)
+{
+    if (!io->is_flushed) {
+        apr_off_t len = h2_brigade_mem_size(io->output);
+        if (len > io->flush_threshold) {
+            return 1;
+        }
+        /* if we do not exceed flush length due to memory limits,
+         * we want at least flush when we have that amount of data. */
+        apr_brigade_length(io->output, 0, &len);
+        return len > (4 * io->flush_threshold);
+    }
+    return 0;
+}
+
 apr_status_t h2_conn_io_flush(h2_conn_io *io)
 {
-    apr_time_t start = 0;
     apr_status_t status;
-    
-    if (io->needs_flush > 0) {
-        /* this is a buffer size triggered flush, let's measure how
-         * long it takes and try to adjust our speed factor accordingly */
-        start = apr_time_now();
-    }
     status = pass_output(io, 1, NULL);
     check_write_size(io);
-    if (start && status == APR_SUCCESS) {
-        apr_time_t duration = apr_time_now() - start;
-        if (duration < apr_time_from_msec(100)) {
-            io->speed_factor *= 1.0 + ((apr_time_from_msec(100) - duration) 
-                                        / (float)apr_time_from_msec(100));
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c,
-                          "h2_conn_io(%ld): incr speed_factor to %f",
-                          io->c->id, io->speed_factor);
-        }
-        else if (duration > apr_time_from_msec(200)) {
-            io->speed_factor *= 0.5;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, io->c,
-                          "h2_conn_io(%ld): decr speed_factor to %f",
-                          io->c->id, io->speed_factor);
-        }
-    }
-    io->needs_flush = -1;
     return status;
 }
 
@@ -367,6 +358,10 @@ apr_status_t h2_conn_io_write(h2_conn_io
     apr_status_t status = APR_SUCCESS;
     apr_size_t remain;
     
+    if (length > 0) {
+        io->is_flushed = 0;
+    }
+    
     if (io->buffer_output) {
         while (length > 0) {
             remain = assure_scratch_space(io);
@@ -396,7 +391,6 @@ apr_status_t h2_conn_io_write(h2_conn_io
     else {
         status = apr_brigade_write(io->output, NULL, NULL, data, length);
     }
-    io->needs_flush = -1;
     return status;
 }
 
@@ -405,6 +399,10 @@ apr_status_t h2_conn_io_pass(h2_conn_io
     apr_bucket *b;
     apr_status_t status = APR_SUCCESS;
     
+    if (!APR_BRIGADE_EMPTY(bb)) {
+        io->is_flushed = 0;
+    }
+
     while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
         b = APR_BRIGADE_FIRST(bb);
         
@@ -447,21 +445,6 @@ apr_status_t h2_conn_io_pass(h2_conn_io
             APR_BRIGADE_INSERT_TAIL(io->output, b);
         }
     }
-    io->needs_flush = -1;
     return status;
 }
 
-int h2_conn_io_needs_flush(h2_conn_io *io)
-{
-    if (io->needs_flush < 0) {
-        apr_off_t len;
-        apr_brigade_length(io->output, 0, &len);
-        if (len > (io->pass_threshold * io->flush_factor * io->speed_factor)) {
-            /* don't want to keep too much around */
-            io->needs_flush = 1;
-            return 1;
-        }
-        io->needs_flush = 0;
-    }
-    return 0;
-}

Modified: httpd/httpd/trunk/modules/http2/h2_conn_io.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn_io.h?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn_io.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn_io.h Sun Jan  8 20:11:27 2017
@@ -39,10 +39,8 @@ typedef struct {
     apr_int64_t bytes_written;
     
     int buffer_output;
-    int needs_flush;
-    apr_size_t pass_threshold;
-    float flush_factor;
-    float speed_factor;
+    apr_size_t flush_threshold;
+    unsigned int is_flushed : 1;
     
     char *scratch;
     apr_size_t ssize;
@@ -76,6 +74,9 @@ apr_status_t h2_conn_io_write_eoc(h2_con
  */
 apr_status_t h2_conn_io_flush(h2_conn_io *io);
 
+/**
+ * Check if the buffered amount of data needs flushing.
+ */
 int h2_conn_io_needs_flush(h2_conn_io *io);
 
 #endif /* defined(__mod_h2__h2_conn_io__) */

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Sun Jan  8 20:11:27 2017
@@ -17,6 +17,7 @@
 #include <stddef.h>
 #include <stdlib.h>
 
+#include <apr_atomic.h>
 #include <apr_thread_mutex.h>
 #include <apr_thread_cond.h>
 #include <apr_strings.h>
@@ -143,6 +144,12 @@ static void stream_output_consumed(void
     }
 }
 
+static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
+{
+    h2_mplx *m = ctx;
+    apr_atomic_set32(&m->event_pending, 1);
+}
+
 static void stream_input_consumed(void *ctx, 
                                   h2_bucket_beam *beam, apr_off_t length)
 {
@@ -337,18 +344,18 @@ int h2_mplx_shutdown(h2_mplx *m)
     return max_stream_started;
 }
 
-static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
+static int input_consumed_signal(h2_mplx *m, h2_stream *stream)
 {
-    if (stream->input && stream->started) {
-        h2_beam_send(stream->input, NULL, 0); /* trigger updates */
+    if (stream->input) {
+        return h2_beam_report_consumption(stream->input);
     }
+    return 0;
 }
 
 static int output_consumed_signal(h2_mplx *m, h2_task *task)
 {
-    if (task->output.beam && task->worker_started && task->assigned) {
-        /* trigger updates */
-        h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
+    if (task->output.beam) {
+        return h2_beam_report_consumption(task->output.beam);
     }
     return 0;
 }
@@ -438,7 +445,7 @@ static void stream_done(h2_mplx *m, h2_s
     
     h2_stream_cleanup(stream);
     m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
-    h2_beam_on_consumed(stream->input, NULL, NULL);
+    h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
     /* Let anyone blocked reading know that there is no more to come */
     h2_beam_abort(stream->input);
     /* Remove mutex after, so that abort still finds cond to signal */
@@ -711,7 +718,7 @@ static apr_status_t out_open(h2_mplx *m,
                       "h2_mplx(%s): out open", task->id);
     }
     
-    h2_beam_on_consumed(stream->output, stream_output_consumed, task);
+    h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, task);
     h2_beam_on_produced(stream->output, output_produced, m);
     beamed_count = h2_beam_get_files_beamed(stream->output);
     if (m->tx_handles_reserved >= beamed_count) {
@@ -785,7 +792,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
-        else if (!h2_iq_empty(m->readyq)) {
+        else if (apr_atomic_read32(&m->event_pending) > 0) {
             status = APR_SUCCESS;
         }
         else {
@@ -809,6 +816,7 @@ static void have_out_data_for(h2_mplx *m
     ap_assert(m);
     ap_assert(stream);
     h2_iq_append(m->readyq, stream->id);
+    apr_atomic_set32(&m->event_pending, 1);
     if (m->added_output) {
         apr_thread_cond_signal(m->added_output);
     }
@@ -847,6 +855,7 @@ apr_status_t h2_mplx_process(h2_mplx *m,
         else {
             h2_ihash_add(m->streams, stream);
             if (h2_stream_is_ready(stream)) {
+                apr_atomic_set32(&m->event_pending, 1);
                 h2_iq_append(m->readyq, stream->id);
             }
             else {
@@ -912,7 +921,8 @@ static h2_task *next_stream_task(h2_mplx
             }
 
             h2_beam_timeout_set(stream->input, m->stream_timeout);
-            h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+            h2_beam_on_consumed(stream->input, stream_input_ev, 
+                                stream_input_consumed, m);
             h2_beam_on_file_beam(stream->input, can_beam_file, m);
             h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
             
@@ -1027,7 +1037,7 @@ static void task_done(h2_mplx *m, h2_tas
                           task->id);
             /* more data will not arrive, resume the stream */
             have_out_data_for(m, stream, 0);
-            h2_beam_on_consumed(stream->output, NULL, NULL);
+            h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
             h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
         }
         else {
@@ -1041,7 +1051,7 @@ static void task_done(h2_mplx *m, h2_tas
                  * called from a worker thread and freeing memory pools
                  * is only safe in the only thread using it (and its
                  * parent pool / allocator) */
-                h2_beam_on_consumed(stream->output, NULL, NULL);
+                h2_beam_on_consumed(stream->output, NULL, NULL, NULL);
                 h2_beam_mutex_set(stream->output, NULL, NULL, NULL);
                 h2_ihash_remove(m->shold, stream->id);
                 h2_ihash_add(m->spurge, stream);
@@ -1351,7 +1361,12 @@ void h2_mplx_req_engine_done(h2_req_engi
  * mplx master events dispatching
  ******************************************************************************/
 
-static int update_window(void *ctx, void *val)
+int h2_mplx_has_master_events(h2_mplx *m)
+{
+    return apr_atomic_read32(&m->event_pending) > 0;
+}
+
+static int report_consumption_iter(void *ctx, void *val)
 {
     input_consumed_signal(ctx, val);
     return 1;
@@ -1367,25 +1382,29 @@ apr_status_t h2_mplx_dispatch_master_eve
     h2_stream *stream;
     size_t i, n;
     
+    if (!h2_mplx_has_master_events(m)) {
+        return APR_EAGAIN;
+    }
+    
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                      "h2_mplx(%ld): dispatch events", m->id);
-                      
+                      "h2_mplx(%ld): dispatch events", m->id);        
+        apr_atomic_set32(&m->event_pending, 0);
         /* update input windows for streams */
-        h2_ihash_iter(m->streams, update_window, m);
-        if (on_resume && !h2_iq_empty(m->readyq)) {
+        h2_ihash_iter(m->streams, report_consumption_iter, m);
+        
+        if (!h2_iq_empty(m->readyq)) {
             n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
             for (i = 0; i < n; ++i) {
                 stream = h2_ihash_get(m->streams, ids[i]);
                 if (stream) {
-                    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
-                                  "h2_mplx(%ld-%d): on_resume", 
-                                  m->id, stream->id);
                     on_resume(on_ctx, stream);
                 }
             }
         }
-        
+        if (!h2_iq_empty(m->readyq)) {
+            apr_atomic_set32(&m->event_pending, 1);
+        } 
         leave_mutex(m, acquired);
     }
     return status;
@@ -1400,6 +1419,7 @@ apr_status_t h2_mplx_keep_active(h2_mplx
         h2_stream *s = h2_ihash_get(m->streams, stream_id);
         if (s) {
             h2_iq_append(m->readyq, stream_id);
+            apr_atomic_set32(&m->event_pending, 1);
         }
         leave_mutex(m, acquired);
     }

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Sun Jan  8 20:11:27 2017
@@ -67,6 +67,7 @@ struct h2_mplx {
 
     APR_RING_ENTRY(h2_mplx) link;
 
+    unsigned int event_pending;
     unsigned int aborted : 1;
     unsigned int need_registration : 1;
 
@@ -222,6 +223,12 @@ void h2_mplx_set_consumed_cb(h2_mplx *m,
 typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream);
 
 /**
+ * Check if the multiplexer has events for the master connection pending.
+ * @return != 0 iff there are events pending
+ */
+int h2_mplx_has_master_events(h2_mplx *m);
+
+/**
  * Dispatch events for the master connection, such as
  ± @param m the multiplexer
  * @param on_resume new output data has arrived for a suspended stream 

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Sun Jan  8 20:11:27 2017
@@ -48,6 +48,7 @@
 
 
 static apr_status_t dispatch_master(h2_session *session);
+static apr_status_t h2_session_read(h2_session *session, int block);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
@@ -240,17 +241,6 @@ static ssize_t send_cb(nghttp2_session *
     
     (void)ngh2;
     (void)flags;
-    if (h2_conn_io_needs_flush(&session->io)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                      "h2_session(%ld): blocking due to io flush",
-                      session->id);
-        status = h2_conn_io_flush(&session->io);
-        if (status != APR_SUCCESS) {
-            return h2_session_status_from_apr_status(status);
-        }
-        return NGHTTP2_ERR_WOULDBLOCK;
-    }
-    
     status = h2_conn_io_write(&session->io, (const char *)data, length);
     if (status == APR_SUCCESS) {
         return length;
@@ -569,6 +559,16 @@ static int on_frame_recv_cb(nghttp2_sess
     return 0;
 }
 
+static int h2_session_continue_data(h2_session *session) {
+    if (h2_mplx_has_master_events(session->mplx)) {
+        return 0;
+    }
+    if (h2_conn_io_needs_flush(&session->io)) {
+        return 0;
+    }
+    return 1;
+}
+
 static char immortal_zeros[H2_MAX_PADLEN];
 
 static int on_send_data_cb(nghttp2_session *ngh2, 
@@ -589,17 +589,10 @@ static int on_send_data_cb(nghttp2_sessi
     
     (void)ngh2;
     (void)source;
-    if (h2_conn_io_needs_flush(&session->io)) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                      "h2_stream(%ld-%d): blocking due to io flush",
-                      session->id, stream_id);
-        status = h2_conn_io_flush(&session->io);
-        if (status != APR_SUCCESS) {
-            return h2_session_status_from_apr_status(status);
-        }
+    if (!h2_session_continue_data(session)) {
         return NGHTTP2_ERR_WOULDBLOCK;
     }
-    
+
     if (frame->data.padlen > H2_MAX_PADLEN) {
         return NGHTTP2_ERR_PROTO;
     }
@@ -1418,8 +1411,6 @@ static apr_status_t h2_session_send(h2_s
         apr_socket_timeout_set(socket, session->s->timeout);
     }
     
-    /* This sends one round of frames from every able stream, plus
-     * settings etc. if accumulated */
     rv = nghttp2_session_send(session->ngh2);
     
     if (socket) {
@@ -2058,7 +2049,11 @@ static apr_status_t dispatch_master(h2_s
     
     status = h2_mplx_dispatch_master_events(session->mplx, 
                                             on_stream_resume, session);
-    if (status != APR_SUCCESS) {
+    if (status == APR_EAGAIN) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
+                      "h2_session(%ld): no master event available", session->id);
+    }
+    else if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
                       "h2_session(%ld): dispatch error", session->id);
         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
@@ -2118,6 +2113,14 @@ apr_status_t h2_session_process(h2_sessi
             case H2_SESSION_ST_IDLE:
                 /* make certain, we send everything before we idle */
                 h2_conn_io_flush(&session->io);
+                /* We trust our connection into the default timeout/keepalive
+                 * handling of the core filters/mpm iff:
+                 * - keep_sync_until is not set
+                 * - we have an async mpm
+                 * - we have no open streams to process
+                 * - we are not sitting on a Upgrade: request
+                 * - we already have seen at least one request
+                 */
                 if (!session->keep_sync_until && async && !session->open_streams
                     && !session->r && session->remote.emitted_count) {
                     if (trace) {
@@ -2126,15 +2129,6 @@ apr_status_t h2_session_process(h2_sessi
                                       "%d streams open", session->id, 
                                       session->open_streams);
                     }
-                    /* We do not return to the async mpm immediately, since under
-                     * load, mpms show the tendency to throw keep_alive connections
-                     * away very rapidly.
-                     * So, if we are still processing streams, we wait for the
-                     * normal timeout first and, on timeout, close.
-                     * If we have no streams, we still wait a short amount of
-                     * time here for the next frame to arrive, before handing
-                     * it to keep_alive processing of the mpm.
-                     */
                     status = h2_session_read(session, 0);
                     
                     if (status == APR_SUCCESS) {
@@ -2219,7 +2213,6 @@ apr_status_t h2_session_process(h2_sessi
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
                     }
                 }
-                
                 break;
                 
             case H2_SESSION_ST_BUSY:
@@ -2244,13 +2237,16 @@ apr_status_t h2_session_process(h2_sessi
                 }
 
                 status = dispatch_master(session);
-                if (status != APR_SUCCESS) {
+                if (status != APR_SUCCESS && status != APR_EAGAIN) {
                     break;
                 }
                 
                 if (nghttp2_session_want_write(session->ngh2)) {
                     ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
                     status = h2_session_send(session);
+                    if (status == APR_SUCCESS) {
+                        status = h2_conn_io_flush(&session->io);
+                    }
                     if (status != APR_SUCCESS) {
                         dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 
                                        H2_ERR_INTERNAL_ERROR, "writing");

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Sun Jan  8 20:11:27 2017
@@ -481,6 +481,7 @@ apr_status_t h2_stream_close_input(h2_st
     APR_BRIGADE_INSERT_TAIL(tmp, b);
     status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
     apr_brigade_destroy(tmp);
+    h2_beam_close(stream->input);
     return status;
 }
 

Modified: httpd/httpd/trunk/modules/http2/h2_version.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_version.h?rev=1777907&r1=1777906&r2=1777907&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_version.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_version.h Sun Jan  8 20:11:27 2017
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.8.7-DEV"
+#define MOD_HTTP2_VERSION "1.8.8-DEV"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010807
+#define MOD_HTTP2_VERSION_NUM 0x010808
 
 
 #endif /* mod_h2_h2_version_h */