You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2017/03/30 16:05:06 UTC

svn commit: r1789535 - in /httpd/httpd/trunk: ./ modules/http2/

Author: icing
Date: Thu Mar 30 16:05:06 2017
New Revision: 1789535

URL: http://svn.apache.org/viewvc?rev=1789535&view=rev
Log:
On the trunk:

mod_http2: move stuff from master connection to worker threads, increase spare slave connections, create output beams in worker when needed.


Modified:
    httpd/httpd/trunk/CHANGES
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
    httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
    httpd/httpd/trunk/modules/http2/h2_conn.c
    httpd/httpd/trunk/modules/http2/h2_mplx.c
    httpd/httpd/trunk/modules/http2/h2_mplx.h
    httpd/httpd/trunk/modules/http2/h2_session.c
    httpd/httpd/trunk/modules/http2/h2_stream.c
    httpd/httpd/trunk/modules/http2/h2_task.c
    httpd/httpd/trunk/modules/http2/h2_task.h

Modified: httpd/httpd/trunk/CHANGES
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/CHANGES [utf-8] (original)
+++ httpd/httpd/trunk/CHANGES [utf-8] Thu Mar 30 16:05:06 2017
@@ -2,7 +2,9 @@
 Changes with Apache 2.5.0
 
   *) mod_http2: better performance, eliminated need for nested locks and
-     thread privates. [Stefan Eissing]
+     thread privates. Moving request setups from the main connection to the
+     worker threads. Increase number of spare connections kept.
+     [Stefan Eissing]
      
   *) core: Disallow multiple Listen on the same IP:port when listener buckets
      are configured (ListenCoresBucketsRatio > 0), consistently with the single

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.c Thu Mar 30 16:05:06 2017
@@ -832,11 +832,10 @@ static apr_status_t append_bucket(h2_buc
         apr_bucket_file *bf = b->data;
         apr_file_t *fd = bf->fd;
         int can_beam = (bf->refcount.refcount == 1);
-        if (can_beam && beam->last_beamed != fd && beam->can_beam_fn) {
+        if (can_beam && beam->can_beam_fn) {
             can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
         }
         if (can_beam) {
-            beam->last_beamed = fd;
             status = apr_bucket_setaside(b, beam->send_pool);
         }
         /* else: enter ENOTIMPL case below */

Modified: httpd/httpd/trunk/modules/http2/h2_bucket_beam.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_bucket_beam.h?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_bucket_beam.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_bucket_beam.h Thu Mar 30 16:05:06 2017
@@ -183,7 +183,6 @@ struct h2_bucket_beam {
 
     apr_size_t buckets_sent;  /* # of beam buckets sent */
     apr_size_t files_beamed;  /* how many file handles have been set aside */
-    apr_file_t *last_beamed;  /* last file beamed */
     
     unsigned int aborted : 1;
     unsigned int closed : 1;
@@ -376,6 +375,16 @@ int h2_beam_report_consumption(h2_bucket
 void h2_beam_on_produced(h2_bucket_beam *beam, 
                          h2_beam_io_callback *io_cb, void *ctx);
 
+/**
+ * Register a callback that may prevent a file from being beam as
+ * file handle, forcing the file content to be copied. Then no callback
+ * is set (NULL), file handles are transferred directly.
+ * @param beam the beam to set the callback on
+ * @param io_cb the callback or NULL, called on receiver with bytes produced
+ * @param ctx  the context to use in callback invocation
+ * 
+ * Call from the receiver side, callbacks invoked on either side.
+ */
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx);
 

Modified: httpd/httpd/trunk/modules/http2/h2_conn.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_conn.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_conn.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_conn.c Thu Mar 30 16:05:06 2017
@@ -308,6 +308,7 @@ conn_rec *h2_slave_create(conn_rec *mast
                                              master->id, slave_id);
     /* Simulate that we had already a request on this connection. */
     c->keepalives             = 1;
+    c->aborted                = 0;
     /* We cannot install the master connection socket on the slaves, as
      * modules mess with timeouts/blocking of the socket, with
      * unwanted side effects to the master connection processing.
@@ -335,6 +336,7 @@ void h2_slave_destroy(conn_rec *slave)
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave,
                   "h2_stream(%s): destroy slave", 
                   apr_table_get(slave->notes, H2_TASK_ID_NOTE));
+    slave->sbh = NULL;
     apr_pool_destroy(slave->pool);
 }
 

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.c Thu Mar 30 16:05:06 2017
@@ -131,11 +131,6 @@ static void stream_input_consumed(void *
     h2_stream_in_consumed(ctx, length);
 }
 
-static int can_always_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
-{
-    return 1;
-}
-
 static void stream_joined(h2_mplx *m, h2_stream *stream)
 {
     ap_assert(!stream->task || stream->task->worker_done);
@@ -152,8 +147,10 @@ static void stream_cleanup(h2_mplx *m, h
         h2_beam_on_consumed(stream->input, NULL, NULL, NULL);
         h2_beam_abort(stream->input);
     }
-    h2_beam_on_produced(stream->output, NULL, NULL);
-    h2_beam_leave(stream->output);
+    if (stream->output) {
+        h2_beam_on_produced(stream->output, NULL, NULL);
+        h2_beam_leave(stream->output);
+    }
     
     h2_stream_cleanup(stream);
 
@@ -246,10 +243,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr
         m->readyq = h2_iq_create(m->pool, m->max_streams);
 
         m->workers = workers;
-        m->workers_max = workers->max_workers;
-        m->workers_limit = 6; /* the original h1 max parallel connections */
+        m->max_active = workers->max_workers;
+        m->limit_active = 6; /* the original h1 max parallel connections */
         m->last_limit_change = m->last_idle_block = apr_time_now();
-        m->limit_change_interval = apr_time_from_msec(200);
+        m->limit_change_interval = apr_time_from_msec(100);
         
         m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
         
@@ -301,7 +298,7 @@ static void task_destroy(h2_mplx *m, h2_
     int reuse_slave = 0;
     
     slave = task->c;
-    reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
+    reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2))
                    && !task->rst_error);
     
     if (slave) {
@@ -328,12 +325,6 @@ static int stream_destroy_iter(void *ctx
     h2_ihash_remove(m->spurge, stream->id);
     ap_assert(stream->state == H2_SS_CLEANUP);
     
-    if (stream->output == NULL) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, m->c, 
-                      H2_STRM_MSG(stream, "already with beams==NULL"));
-        return 0;
-    }
-    
     if (stream->input) {
         /* Process outstanding events before destruction */
         input_consumed_signal(m, stream);    
@@ -341,10 +332,7 @@ static int stream_destroy_iter(void *ctx
         h2_beam_destroy(stream->input);
         stream->input = NULL;
     }
-    
-    h2_beam_log(stream->output, m->c, APLOG_TRACE2, "stream_destroy");
-    h2_beam_destroy(stream->output);
-    stream->output = NULL;
+
     if (stream->task) {
         task_destroy(m, stream->task);
         stream->task = NULL;
@@ -547,10 +535,13 @@ static apr_status_t out_open(h2_mplx *m,
     apr_status_t status = APR_SUCCESS;
     h2_stream *stream = h2_ihash_get(m->streams, stream_id);
     
-    if (!stream || !stream->task) {
+    if (!stream || !stream->task || m->aborted) {
         return APR_ECONNABORTED;
     }
     
+    ap_assert(stream->output == NULL);
+    stream->output = beam;
+    
     if (APLOGctrace2(m->c)) {
         h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
     }
@@ -561,8 +552,8 @@ static apr_status_t out_open(h2_mplx *m,
     
     h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
     h2_beam_on_produced(stream->output, output_produced, m);
-    if (!stream->task->output.copy_files) {
-        h2_beam_on_file_beam(stream->output, can_always_beam_file, m);
+    if (stream->task->output.copy_files) {
+        h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
     }
     
     /* time to protect the beam against multi-threaded use */
@@ -721,7 +712,7 @@ static h2_task *next_stream_task(h2_mplx
 {
     h2_stream *stream;
     int sid;
-    while (!m->aborted && (m->workers_busy < m->workers_limit)
+    while (!m->aborted && (m->tasks_active < m->limit_active)
            && (sid = h2_iq_shift(m->q)) > 0) {
         
         stream = h2_ihash_get(m->streams, sid);
@@ -731,36 +722,37 @@ static h2_task *next_stream_task(h2_mplx
             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
             if (pslave) {
                 slave = *pslave;
+                slave->aborted = 0;
             }
             else {
                 slave = h2_slave_create(m->c, stream->id, m->pool);
             }
             
-            slave->sbh = m->c->sbh;
-            slave->aborted = 0;
             if (!stream->task) {
-                stream->task = h2_task_create(stream, slave);
-                
+            
                 m->c->keepalives++;
-                apr_table_setn(slave->notes, H2_TASK_ID_NOTE, stream->task->id);
-                h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
-
                 if (sid > m->max_stream_started) {
                     m->max_stream_started = sid;
                 }
-                
                 if (stream->input) {
                     h2_beam_on_consumed(stream->input, stream_input_ev, 
                                         stream_input_consumed, stream);
-                    h2_beam_on_file_beam(stream->input, can_always_beam_file, m);
-                    h2_beam_mutex_enable(stream->input);
                 }
                 
-                h2_beam_buffer_size_set(stream->output, m->stream_max_mem);
+                stream->task = h2_task_create(slave, stream->id, 
+                                              stream->request, m, stream->input, 
+                                              stream->session->s->timeout,
+                                              m->stream_max_mem);
+                if (!stream->task) {
+                    ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
+                                  H2_STRM_LOG(APLOGNO(02941), stream, 
+                                  "create task"));
+                    return NULL;
+                }
+                
             }
-            stream->task->worker_started = 1;
-            stream->task->started_at = apr_time_now();
-            ++m->workers_busy;
+            
+            ++m->tasks_active;
             return stream->task;
         }
     }
@@ -841,14 +833,14 @@ static void task_done(h2_mplx *m, h2_tas
          * a block by flow control.
          */
         if (task->done_at- m->last_limit_change >= m->limit_change_interval
-            && m->workers_limit < m->workers_max) {
+            && m->limit_active < m->max_active) {
             /* Well behaving stream, allow it more workers */
-            m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                     m->workers_max);
+            m->limit_active = H2MIN(m->limit_active * 2, 
+                                     m->max_active);
             m->last_limit_change = task->done_at;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): increase worker limit to %d",
-                          m->id, m->workers_limit);
+                          m->id, m->limit_active);
         }
     }
     
@@ -870,7 +862,9 @@ static void task_done(h2_mplx *m, h2_tas
             h2_beam_mutex_disable(stream->input);
             h2_beam_leave(stream->input);
         }
-        h2_beam_mutex_disable(stream->output);
+        if (stream->output) {
+            h2_beam_mutex_disable(stream->output);
+        }
         check_data_for(m, stream->id);
     }
     else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
@@ -881,7 +875,9 @@ static void task_done(h2_mplx *m, h2_tas
             h2_beam_mutex_disable(stream->input);
             h2_beam_leave(stream->input);
         }
-        h2_beam_mutex_disable(stream->output);
+        if (stream->output) {
+            h2_beam_mutex_disable(stream->output);
+        }
         stream_joined(m, stream);
     }
     else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) {
@@ -903,7 +899,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_ta
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
         task_done(m, task, NULL);
-        --m->workers_busy;
+        --m->tasks_active;
         if (m->join_wait) {
             apr_thread_cond_signal(m->join_wait);
         }
@@ -982,14 +978,14 @@ static apr_status_t unschedule_slow_task
     /* Try to get rid of streams that occupy workers. Look for safe requests
      * that are repeatable. If none found, fail the connection.
      */
-    n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->sredo));
+    n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo));
     while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) {
         h2_task_rst(stream->task, H2_ERR_CANCEL);
         h2_ihash_add(m->sredo, stream);
         --n;
     }
     
-    if ((m->workers_busy - h2_ihash_count(m->sredo)) > m->workers_limit) {
+    if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) {
         h2_stream *stream = get_timed_out_busy_stream(m);
         if (stream) {
             /* Too many busy workers, unable to cancel enough streams
@@ -1009,7 +1005,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
         apr_size_t scount = h2_ihash_count(m->streams);
-        if (scount > 0 && m->workers_busy) {
+        if (scount > 0 && m->tasks_active) {
             /* If we have streams in connection state 'IDLE', meaning
              * all streams are ready to sent data out, but lack
              * WINDOW_UPDATEs. 
@@ -1024,27 +1020,27 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
              */
             now = apr_time_now();
             m->last_idle_block = now;
-            if (m->workers_limit > 2 
+            if (m->limit_active > 2 
                 && now - m->last_limit_change >= m->limit_change_interval) {
-                if (m->workers_limit > 16) {
-                    m->workers_limit = 16;
+                if (m->limit_active > 16) {
+                    m->limit_active = 16;
                 }
-                else if (m->workers_limit > 8) {
-                    m->workers_limit = 8;
+                else if (m->limit_active > 8) {
+                    m->limit_active = 8;
                 }
-                else if (m->workers_limit > 4) {
-                    m->workers_limit = 4;
+                else if (m->limit_active > 4) {
+                    m->limit_active = 4;
                 }
-                else if (m->workers_limit > 2) {
-                    m->workers_limit = 2;
+                else if (m->limit_active > 2) {
+                    m->limit_active = 2;
                 }
                 m->last_limit_change = now;
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                               "h2_mplx(%ld): decrease worker limit to %d",
-                              m->id, m->workers_limit);
+                              m->id, m->limit_active);
             }
             
-            if (m->workers_busy > m->workers_limit) {
+            if (m->tasks_active > m->limit_active) {
                 status = unschedule_slow_tasks(m);
             }
         }
@@ -1257,7 +1253,7 @@ int h2_mplx_awaits_data(h2_mplx *m)
         if (h2_ihash_empty(m->streams)) {
             waiting = 0;
         }
-        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->workers_busy) {
+        if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
             waiting = 0;
         }
         leave_mutex(m, acquired);

Modified: httpd/httpd/trunk/modules/http2/h2_mplx.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_mplx.h?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_mplx.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_mplx.h Thu Mar 30 16:05:06 2017
@@ -74,9 +74,9 @@ struct h2_mplx {
     
     int max_streams;        /* max # of concurrent streams */
     int max_stream_started; /* highest stream id that started processing */
-    int workers_busy;       /* # of workers processing on this mplx */
-    int workers_limit;      /* current # of workers limit, dynamic */
-    int workers_max;        /* max, hard limit # of workers in a process */
+    int tasks_active;       /* # of tasks being processed from this mplx */
+    int limit_active;       /* current limit on active tasks, dynamic */
+    int max_active;         /* max, hard limit # of active tasks in a process */
     apr_time_t last_idle_block;      /* last time, this mplx entered IDLE while
                                       * streams were ready */
     apr_time_t last_limit_change;    /* last time, worker limit changed */

Modified: httpd/httpd/trunk/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_session.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_session.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_session.c Thu Mar 30 16:05:06 2017
@@ -870,8 +870,8 @@ static apr_status_t h2_session_create_in
                                   "push_diary(type=%d,N=%d)"),
                       (int)session->max_stream_count, 
                       (int)session->max_stream_mem,
-                      session->mplx->workers_limit, 
-                      session->mplx->workers_max, 
+                      session->mplx->limit_active, 
+                      session->mplx->max_active, 
                       session->push_diary->dtype, 
                       (int)session->push_diary->N);
     }

Modified: httpd/httpd/trunk/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_stream.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_stream.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_stream.c Thu Mar 30 16:05:06 2017
@@ -241,7 +241,7 @@ static apr_status_t close_input(h2_strea
 
 static apr_status_t close_output(h2_stream *stream)
 {
-    if (h2_beam_is_closed(stream->output)) {
+    if (!stream->output || h2_beam_is_closed(stream->output)) {
         return APR_SUCCESS;
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
@@ -531,9 +531,6 @@ h2_stream *h2_stream_create(int id, apr_
     stream->monitor      = monitor;
     stream->max_mem      = session->max_stream_mem;
     
-    h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
-                   session->s->timeout);
-               
 #ifdef H2_NG2_LOCAL_WIN_SIZE
     stream->in_window_size = 
         nghttp2_session_get_stream_local_window_size(
@@ -607,7 +604,9 @@ void h2_stream_rst(h2_stream *stream, in
     if (stream->input) {
         h2_beam_abort(stream->input);
     }
-    h2_beam_leave(stream->output);
+    if (stream->output) {
+        h2_beam_leave(stream->output);
+    }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
                   H2_STRM_MSG(stream, "reset, error=%d"), error_code);
     h2_stream_dispatch(stream, H2_SEV_CANCELLED);
@@ -777,6 +776,8 @@ apr_status_t h2_stream_out_prepare(h2_st
         *presponse = NULL;
     }
     
+    ap_assert(stream);
+    
     if (stream->rst_error) {
         *plen = 0;
         *peos = 1;
@@ -785,7 +786,7 @@ apr_status_t h2_stream_out_prepare(h2_st
     
     c = stream->session->c;
     prep_output(stream);
-    
+
     /* determine how much we'd like to send. We cannot send more than
      * is requested. But we can reduce the size in case the master
      * connection operates in smaller chunks. (TSL warmup) */
@@ -797,8 +798,15 @@ apr_status_t h2_stream_out_prepare(h2_st
     h2_util_bb_avail(stream->out_buffer, plen, peos);
     if (!*peos && *plen < requested && *plen < stream->max_mem) {
         H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
-        status = h2_beam_receive(stream->output, stream->out_buffer, 
-                                 APR_NONBLOCK_READ, stream->max_mem - *plen);
+        if (stream->output) {
+            status = h2_beam_receive(stream->output, stream->out_buffer, 
+                                     APR_NONBLOCK_READ, 
+                                     stream->max_mem - *plen);
+        }
+        else {
+            status = APR_EOF;
+        }
+        
         if (APR_STATUS_IS_EOF(status)) {
             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);

Modified: httpd/httpd/trunk/modules/http2/h2_task.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.c?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.c Thu Mar 30 16:05:06 2017
@@ -496,42 +496,44 @@ static int h2_task_pre_conn(conn_rec* c,
     return OK;
 }
 
-h2_task *h2_task_create(h2_stream *stream, conn_rec *slave)
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+                        const h2_request *req, h2_mplx *m,
+                        h2_bucket_beam *input, 
+                        apr_interval_time_t timeout,
+                        apr_size_t output_max_mem)
 {
     apr_pool_t *pool;
     h2_task *task;
     
     ap_assert(slave);
-    ap_assert(stream);
-    ap_assert(stream->request);
+    ap_assert(req);
 
     apr_pool_create(&pool, slave->pool);
     task = apr_pcalloc(pool, sizeof(h2_task));
     if (task == NULL) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave,
-                      H2_STRM_LOG(APLOGNO(02941), stream, "create task"));
         return NULL;
     }
-    task->id          = apr_psprintf(pool, "%ld-%d", 
-                                     stream->session->id, stream->id);
-    task->stream_id   = stream->id;
+    task->id          = "000";
+    task->stream_id   = stream_id;
     task->c           = slave;
-    task->mplx        = stream->session->mplx;
-    task->c->keepalives = slave->master->keepalives;
+    task->mplx        = m;
     task->pool        = pool;
-    task->request     = stream->request;
-    task->input.beam  = stream->input;
-    task->output.beam = stream->output;
-    task->timeout     = stream->session->s->timeout;
-    
-    h2_beam_send_from(stream->output, task->pool);
-    h2_ctx_create_for(slave, task);
-    
+    task->request     = req;
+    task->timeout     = timeout;
+    task->input.beam  = input;
+    task->output.max_buffer = output_max_mem;
+
     return task;
 }
 
 void h2_task_destroy(h2_task *task)
 {
+    if (task->output.beam) {
+        h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "task_destroy");
+        h2_beam_destroy(task->output.beam);
+        task->output.beam = NULL;
+    }
+    
     if (task->eor) {
         apr_bucket_destroy(task->eor);
     }
@@ -542,9 +544,14 @@ void h2_task_destroy(h2_task *task)
 
 apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
 {
+    conn_rec *c;
+    
     ap_assert(task);
-
-    if (task->c->master) {
+    c = task->c;
+    task->worker_started = 1;
+    task->started_at = apr_time_now();
+    
+    if (c->master) {
         /* Each conn_rec->id is supposed to be unique at a point in time. Since
          * some modules (and maybe external code) uses this id as an identifier
          * for the request_rec they handle, it needs to be unique for slave 
@@ -562,6 +569,8 @@ apr_status_t h2_task_do(h2_task *task, a
          */
         int slave_id, free_bits;
         
+        task->id = apr_psprintf(task->pool, "%ld-%d", c->master->id, 
+                                task->stream_id);
         if (sizeof(unsigned long) >= 8) {
             free_bits = 32;
             slave_id = task->stream_id;
@@ -573,12 +582,31 @@ apr_status_t h2_task_do(h2_task *task, a
             free_bits = 8;
             slave_id = worker_id; 
         }
-        task->c->id = (task->c->master->id << free_bits)^slave_id;
+        task->c->id = (c->master->id << free_bits)^slave_id;
+        c->keepalive = AP_CONN_KEEPALIVE;
+    }
+        
+    h2_beam_create(&task->output.beam, c->pool, task->stream_id, "output", 
+                   H2_BEAM_OWNER_SEND, 0, task->timeout);
+    if (!task->output.beam) {
+        return APR_ENOMEM;
     }
     
-    task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
+    h2_beam_buffer_size_set(task->output.beam, task->output.max_buffer);
+    h2_beam_send_from(task->output.beam, task->pool);
+    
+    h2_ctx_create_for(c, task);
+    apr_table_setn(c->notes, H2_TASK_ID_NOTE, task->id);
+
+    if (task->input.beam) {
+        h2_beam_mutex_enable(task->input.beam);
+    }
+    
+    h2_slave_run_pre_connection(c, ap_get_conn_socket(c));            
+
+    task->input.bb = apr_brigade_create(task->pool, c->bucket_alloc);
     if (task->request->serialize) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_task(%s): serialize request %s %s", 
                       task->id, task->request->method, task->request->path);
         apr_brigade_printf(task->input.bb, NULL, 
@@ -588,20 +616,21 @@ apr_status_t h2_task_do(h2_task *task, a
         apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                   "h2_task(%s): process connection", task->id);
+                  
     task->c->current_thread = thread; 
-    ap_run_process_connection(task->c);
+    ap_run_process_connection(c);
     
     if (task->frozen) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_task(%s): process_conn returned frozen task", 
                       task->id);
         /* cleanup delayed */
         return APR_EAGAIN;
     }
     else {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_task(%s): processing done", task->id);
         return output_finish(task);
     }

Modified: httpd/httpd/trunk/modules/http2/h2_task.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_task.h?rev=1789535&r1=1789534&r2=1789535&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_task.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_task.h Thu Mar 30 16:05:06 2017
@@ -73,6 +73,7 @@ struct h2_task {
         unsigned int copy_files : 1;
         struct h2_response_parser *rparser;
         apr_bucket_brigade *bb;
+        apr_size_t max_buffer;
     } output;
     
     struct h2_mplx *mplx;
@@ -91,7 +92,11 @@ struct h2_task {
     struct h2_req_engine *assigned; /* engine that task has been assigned to */
 };
 
-h2_task *h2_task_create(struct h2_stream *stream, conn_rec *slave);
+h2_task *h2_task_create(conn_rec *slave, int stream_id,
+                        const h2_request *req, struct h2_mplx *m, 
+                        struct h2_bucket_beam *input, 
+                        apr_interval_time_t timeout,
+                        apr_size_t output_max_mem);
 
 void h2_task_destroy(h2_task *task);