You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2015/11/19 18:14:04 UTC

svn commit: r1715218 [3/4] - in /httpd/httpd/branches/2.4-http2-alpha: ./ docs/manual/mod/ modules/aaa/ modules/http2/ modules/ssl/ server/

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c?rev=1715218&r1=1715217&r2=1715218&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.c Thu Nov 19 17:14:03 2015
@@ -29,6 +29,8 @@
 #include "h2_config.h"
 #include "h2_h2.h"
 #include "h2_mplx.h"
+#include "h2_push.h"
+#include "h2_request.h"
 #include "h2_response.h"
 #include "h2_stream.h"
 #include "h2_stream_set.h"
@@ -55,12 +57,12 @@ static int h2_session_status_from_apr_st
     return NGHTTP2_ERR_PROTO;
 }
 
-static int stream_open(h2_session *session, int stream_id)
+h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
 {
     h2_stream * stream;
     apr_pool_t *stream_pool;
     if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+        return NULL;
     }
     
     if (session->spare) {
@@ -71,19 +73,20 @@ static int stream_open(h2_session *sessi
         apr_pool_create(&stream_pool, session->pool);
     }
     
-    stream = h2_stream_create(stream_id, stream_pool, session);
-    stream->state = H2_STREAM_ST_OPEN;
+    stream = h2_stream_open(stream_id, stream_pool, session);
     
     h2_stream_set_add(session->streams, stream);
-    if (stream->id > session->max_stream_received) {
+    if (H2_STREAM_CLIENT_INITIATED(stream_id)
+        && stream_id > session->max_stream_received) {
         session->max_stream_received = stream->id;
     }
     
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                  "h2_session: stream(%ld-%d): opened",
-                  session->id, stream_id);
-    
-    return 0;
+    return stream;
+}
+
+apr_status_t h2_session_flush(h2_session *session) 
+{
+    return h2_conn_io_flush(&session->io);
 }
 
 /**
@@ -139,8 +142,8 @@ static int stream_pri_cmp(int sid1, int
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-static apr_status_t stream_end_headers(h2_session *session,
-                                       h2_stream *stream, int eos)
+static apr_status_t stream_schedule(h2_session *session,
+                                    h2_stream *stream, int eos)
 {
     (void)session;
     return h2_stream_schedule(stream, eos, stream_pri_cmp, session);
@@ -195,19 +198,19 @@ static int on_data_chunk_recv_cb(nghttp2
                                  int32_t stream_id,
                                  const uint8_t *data, size_t len, void *userp)
 {
-    int rv;
     h2_session *session = (h2_session *)userp;
+    apr_status_t status = APR_SUCCESS;
     h2_stream * stream;
-    apr_status_t status;
+    int rv;
     
     (void)flags;
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    stream = h2_stream_set_get(session->streams, stream_id);
+    
+    stream = h2_session_get_stream(session, stream_id);
     if (!stream) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
-                      APLOGNO(02919) 
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                       "h2_session:  stream(%ld-%d): on_data_chunk for unknown stream",
                       session->id, (int)stream_id);
         rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
@@ -220,8 +223,8 @@ static int on_data_chunk_recv_cb(nghttp2
     
     status = h2_stream_write_data(stream, (const char *)data, len);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
-                  "h2_stream(%ld-%d): written DATA, length %d",
-                  session->id, stream_id, (int)len);
+                  "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
+                  session->id, stream_id, (long)len);
     if (status != APR_SUCCESS) {
         rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
                                        H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
@@ -232,75 +235,7 @@ static int on_data_chunk_recv_cb(nghttp2
     return 0;
 }
 
-static int before_frame_send_cb(nghttp2_session *ngh2,
-                                const nghttp2_frame *frame,
-                                void *userp)
-{
-    h2_session *session = (h2_session *)userp;
-    (void)ngh2;
-
-    if (session->aborted) {
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    /* Set the need to flush output when we have added one of the 
-     * following frame types */
-    switch (frame->hd.type) {
-        case NGHTTP2_RST_STREAM:
-        case NGHTTP2_WINDOW_UPDATE:
-        case NGHTTP2_PUSH_PROMISE:
-        case NGHTTP2_PING:
-        case NGHTTP2_GOAWAY:
-            session->flush = 1;
-            break;
-        default:
-            break;
-
-    }
-    if (APLOGctrace2(session->c)) {
-        char buffer[256];
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_session(%ld): before_frame_send %s", 
-                      session->id, buffer);
-    }
-    return 0;
-}
-
-static int on_frame_send_cb(nghttp2_session *ngh2,
-                            const nghttp2_frame *frame,
-                            void *userp)
-{
-    h2_session *session = (h2_session *)userp;
-    (void)ngh2;
-    if (APLOGctrace2(session->c)) {
-        char buffer[256];
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_session(%ld): on_frame_send %s", 
-                      session->id, buffer);
-    }
-    return 0;
-}
-
-static int on_frame_not_send_cb(nghttp2_session *ngh2,
-                                const nghttp2_frame *frame,
-                                int lib_error_code, void *userp)
-{
-    h2_session *session = (h2_session *)userp;
-    (void)ngh2;
-    
-    if (APLOGctrace2(session->c)) {
-        char buffer[256];
-        
-        frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_session: callback on_frame_not_send error=%d %s",
-                      lib_error_code, buffer);
-    }
-    return 0;
-}
-
-static apr_status_t stream_destroy(h2_session *session, 
+static apr_status_t stream_release(h2_session *session, 
                                    h2_stream *stream,
                                    uint32_t error_code) 
 {
@@ -335,33 +270,30 @@ static int on_stream_close_cb(nghttp2_se
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    stream = h2_stream_set_get(session->streams, stream_id);
+    stream = h2_session_get_stream(session, stream_id);
     if (stream) {
-        stream_destroy(session, stream, error_code);
+        stream_release(session, stream, error_code);
     }
-    
-    if (error_code) {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): close error %d",
-                      session->id, (int)stream_id, error_code);
-    }
-    
     return 0;
 }
 
 static int on_begin_headers_cb(nghttp2_session *ngh2,
                                const nghttp2_frame *frame, void *userp)
 {
-    /* This starts a new stream. */
-    int rv;
+    h2_session *session = (h2_session *)userp;
+    h2_stream *s;
+    
+    /* We may see HEADERs at the start of a stream or after all DATA
+     * streams to carry trailers. */
     (void)ngh2;
-    rv = stream_open((h2_session *)userp, frame->hd.stream_id);
-    if (rv != NGHTTP2_ERR_CALLBACK_FAILURE) {
-      /* on_header_cb or on_frame_recv_cb will dectect that stream
-         does not exist and submit RST_STREAM. */
-      return 0;
+    s = h2_session_get_stream(session, frame->hd.stream_id);
+    if (s) {
+        /* nop */
+    }
+    else {
+        s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id);
     }
-    return NGHTTP2_ERR_CALLBACK_FAILURE;
+    return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE;
 }
 
 static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
@@ -379,8 +311,8 @@ static int on_header_cb(nghttp2_session
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
-    stream = h2_stream_set_get(session->streams,
-                                           frame->hd.stream_id);
+    
+    stream = h2_session_get_stream(session, frame->hd.stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02920) 
@@ -389,9 +321,9 @@ static int on_header_cb(nghttp2_session
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
     
-    status = h2_stream_write_header(stream,
-                                               (const char *)name, namelen,
-                                               (const char *)value, valuelen);
+    status = h2_stream_add_header(stream, (const char *)name, namelen,
+                                  (const char *)value, valuelen);
+                                    
     if (status != APR_SUCCESS) {
         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
     }
@@ -407,64 +339,67 @@ static int on_frame_recv_cb(nghttp2_sess
                             const nghttp2_frame *frame,
                             void *userp)
 {
-    int rv;
     h2_session *session = (h2_session *)userp;
     apr_status_t status = APR_SUCCESS;
+    h2_stream *stream;
+    
     if (session->aborted) {
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    ++session->frames_received;
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                  "h2_session(%ld): on_frame_rcv #%ld, type=%d", session->id,
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                  "h2_stream(%ld-%d): on_frame_rcv #%ld, type=%d", 
+                  session->id, frame->hd.stream_id, 
                   (long)session->frames_received, frame->hd.type);
+
+    ++session->frames_received;
     switch (frame->hd.type) {
-        case NGHTTP2_HEADERS: {
-            int eos;
-            h2_stream * stream = h2_stream_set_get(session->streams,
-                                                   frame->hd.stream_id);
-            if (stream == NULL) {
-                ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
-                              APLOGNO(02921) 
-                              "h2_session:  stream(%ld-%d): HEADERS frame "
-                              "for unknown stream", session->id,
-                              (int)frame->hd.stream_id);
-                rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                                               frame->hd.stream_id,
-                                               NGHTTP2_INTERNAL_ERROR);
-                if (nghttp2_is_fatal(rv)) {
-                    return NGHTTP2_ERR_CALLBACK_FAILURE;
+        case NGHTTP2_HEADERS:
+            /* This can be HEADERS for a new stream, defining the request,
+             * or HEADER may come after DATA at the end of a stream as in
+             * trailers */
+            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            if (stream) {
+                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
+                
+                if (h2_stream_is_scheduled(stream)) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                                  "h2_stream(%ld-%d): TRAILER, eos=%d", 
+                                  session->id, frame->hd.stream_id, eos);
+                    if (eos) {
+                        status = h2_stream_close_input(stream);
+                    }
+                }
+                else {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                                  "h2_stream(%ld-%d): HEADER, eos=%d", 
+                                  session->id, frame->hd.stream_id, eos);
+                    status = stream_schedule(session, stream, eos);
                 }
-                return 0;
             }
-
-            eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
-            status = stream_end_headers(session, stream, eos);
-
+            else {
+                status = APR_EINVAL;
+            }
             break;
-        }
-        case NGHTTP2_DATA: {
-            h2_stream * stream = h2_stream_set_get(session->streams,
-                                                   frame->hd.stream_id);
-            if (stream == NULL) {
-                ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
-                              APLOGNO(02922) 
-                              "h2_session:  stream(%ld-%d): DATA frame "
-                              "for unknown stream", session->id,
-                              (int)frame->hd.stream_id);
-                rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
-                                               frame->hd.stream_id,
-                                               NGHTTP2_INTERNAL_ERROR);
-                if (nghttp2_is_fatal(rv)) {
-                    return NGHTTP2_ERR_CALLBACK_FAILURE;
+        case NGHTTP2_DATA:
+            stream = h2_session_get_stream(session, frame->hd.stream_id);
+            if (stream) {
+                int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                              "h2_stream(%ld-%d): DATA, len=%ld, eos=%d", 
+                              session->id, frame->hd.stream_id, 
+                              (long)frame->hd.length, eos);
+                if (eos) {
+                    status = h2_stream_close_input(stream);
                 }
-                return 0;
+            }
+            else {
+                status = APR_EINVAL;
             }
             break;
-        }
-        case NGHTTP2_PRIORITY: {
+        case NGHTTP2_PRIORITY:
             session->reprioritize = 1;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                           "h2_session:  stream(%ld-%d): PRIORITY frame "
                           " weight=%d, dependsOn=%d, exclusive=%d", 
                           session->id, (int)frame->hd.stream_id,
@@ -472,7 +407,13 @@ static int on_frame_recv_cb(nghttp2_sess
                           frame->priority.pri_spec.stream_id,
                           frame->priority.pri_spec.exclusive);
             break;
-        }
+        case NGHTTP2_WINDOW_UPDATE:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+                          "h2_session:  stream(%ld-%d): WINDOW_UPDATE "
+                          "incr=%d", 
+                          session->id, (int)frame->hd.stream_id,
+                          frame->window_update.window_size_increment);
+            break;
         default:
             if (APLOGctrace2(session->c)) {
                 char buffer[256];
@@ -485,23 +426,10 @@ static int on_frame_recv_cb(nghttp2_sess
             break;
     }
 
-    /* only DATA and HEADERS frame can bear END_STREAM flag.  Other
-       frame types may have other flag which has the same value, so we
-       have to check the frame type first.  */
-    if ((frame->hd.type == NGHTTP2_DATA || frame->hd.type == NGHTTP2_HEADERS) &&
-        frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-        h2_stream * stream = h2_stream_set_get(session->streams,
-                                               frame->hd.stream_id);
-        if (stream != NULL) {
-            status = h2_stream_write_eos(stream);
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
-                          "h2_stream(%ld-%d): input closed",
-                          session->id, (int)frame->hd.stream_id);
-        }
-    }
-    
     if (status != APR_SUCCESS) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+        int rv;
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
                       APLOGNO(02923) 
                       "h2_session: stream(%ld-%d): error handling frame",
                       session->id, (int)frame->hd.stream_id);
@@ -511,14 +439,13 @@ static int on_frame_recv_cb(nghttp2_sess
         if (nghttp2_is_fatal(rv)) {
             return NGHTTP2_ERR_CALLBACK_FAILURE;
         }
-        return 0;
     }
     
     return 0;
 }
 
 static apr_status_t pass_data(void *ctx, 
-                              const char *data, apr_size_t length)
+                              const char *data, apr_off_t length)
 {
     return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
 }
@@ -536,7 +463,7 @@ static int on_send_data_cb(nghttp2_sessi
     apr_status_t status = APR_SUCCESS;
     h2_session *session = (h2_session *)userp;
     int stream_id = (int)frame->hd.stream_id;
-    const unsigned char padlen = frame->data.padlen;
+    unsigned char padlen;
     int eos;
     h2_stream *stream;
     
@@ -546,7 +473,12 @@ static int on_send_data_cb(nghttp2_sessi
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
-    stream = h2_stream_set_get(session->streams, stream_id);
+    if (frame->data.padlen > H2_MAX_PADLEN) {
+        return NGHTTP2_ERR_PROTO;
+    }
+    padlen = (unsigned char)frame->data.padlen;
+    
+    stream = h2_session_get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
                       APLOGNO(02924) 
@@ -567,9 +499,8 @@ static int on_send_data_cb(nghttp2_sessi
             }
             
             if (status == APR_SUCCESS) {
-                apr_size_t len = length;
-                status = h2_stream_readx(stream, pass_data, session, 
-                                         &len, &eos);
+                apr_off_t len = length;
+                status = h2_stream_readx(stream, pass_data, session, &len, &eos);
                 if (status == APR_SUCCESS && len != length) {
                     status = APR_EINVAL;
                 }
@@ -594,9 +525,8 @@ static int on_send_data_cb(nghttp2_sessi
         status = h2_conn_io_writeb(&session->io, b);
         
         if (status == APR_SUCCESS) {
-            apr_size_t len = length;
+            apr_off_t len = length;
             status = h2_stream_read_to(stream, session->io.output, &len, &eos);
-            session->io.unflushed = 1;
             if (status == APR_SUCCESS && len != length) {
                 status = APR_EINVAL;
             }
@@ -625,20 +555,6 @@ static int on_send_data_cb(nghttp2_sessi
     return h2_session_status_from_apr_status(status);
 }
 
-static ssize_t on_data_source_read_length_cb(nghttp2_session *session, 
-                                             uint8_t frame_type, int32_t stream_id, 
-                                             int32_t session_remote_window_size, 
-                                             int32_t stream_remote_window_size, 
-                                             uint32_t remote_max_frame_size, 
-                                             void *user_data)
-{
-    /* DATA frames add 9 bytes header plus 1 byte for padlen and additional 
-     * padlen bytes. Keep below TLS maximum record size.
-     * TODO: respect pad bytes when we have that feature.
-     */
-    return (16*1024 - 10);
-}
-
 #define NGH2_SET_CALLBACK(callbacks, name, fn)\
 nghttp2_session_callbacks_set_##name##_callback(callbacks, fn)
 
@@ -656,18 +572,24 @@ static apr_status_t init_callbacks(conn_
     NGH2_SET_CALLBACK(*pcb, on_frame_recv, on_frame_recv_cb);
     NGH2_SET_CALLBACK(*pcb, on_invalid_frame_recv, on_invalid_frame_recv_cb);
     NGH2_SET_CALLBACK(*pcb, on_data_chunk_recv, on_data_chunk_recv_cb);
-    NGH2_SET_CALLBACK(*pcb, before_frame_send, before_frame_send_cb);
-    NGH2_SET_CALLBACK(*pcb, on_frame_send, on_frame_send_cb);
-    NGH2_SET_CALLBACK(*pcb, on_frame_not_send, on_frame_not_send_cb);
     NGH2_SET_CALLBACK(*pcb, on_stream_close, on_stream_close_cb);
     NGH2_SET_CALLBACK(*pcb, on_begin_headers, on_begin_headers_cb);
     NGH2_SET_CALLBACK(*pcb, on_header, on_header_cb);
     NGH2_SET_CALLBACK(*pcb, send_data, on_send_data_cb);
-    NGH2_SET_CALLBACK(*pcb, data_source_read_length, on_data_source_read_length_cb);
     
     return APR_SUCCESS;
 }
 
+static apr_status_t session_pool_cleanup(void *data)
+{
+    h2_session *session = data;
+    
+    /* keep us from destroying the pool, since that is already ongoing. */
+    session->pool = NULL;
+    h2_session_destroy(session);
+    return APR_SUCCESS;
+}
+
 static h2_session *h2_session_create_int(conn_rec *c,
                                          request_rec *r,
                                          h2_config *config, 
@@ -690,6 +612,8 @@ static h2_session *h2_session_create_int
         session->c = c;
         session->r = r;
         
+        apr_pool_pre_cleanup_register(pool, session, session_pool_cleanup);
+        
         session->max_stream_count = h2_config_geti(config, H2_CONF_MAX_STREAMS);
         session->max_stream_mem = h2_config_geti(config, H2_CONF_STREAM_MAX_MEM);
 
@@ -700,7 +624,7 @@ static h2_session *h2_session_create_int
             return NULL;
         }
         
-        session->streams = h2_stream_set_create(session->pool);
+        session->streams = h2_stream_set_create(session->pool, session->max_stream_count);
         
         session->workers = workers;
         session->mplx = h2_mplx_create(c, session->pool, workers);
@@ -761,15 +685,37 @@ h2_session *h2_session_rcreate(request_r
     return h2_session_create_int(r->connection, r, config, workers);
 }
 
-void h2_session_destroy(h2_session *session)
+static void h2_session_cleanup(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
+    /* This is an early cleanup of the session that may
+     * discard what is no longer necessary for *new* streams
+     * and general HTTP/2 processing.
+     * At this point, all frames are in transit or somehwere in
+     * our buffers or passed down output filters.
+     * h2 streams might still being written out.
+     */
+    if (session->ngh2) {
+        nghttp2_session_del(session->ngh2);
+        session->ngh2 = NULL;
+    }
+    if (session->spare) {
+        apr_pool_destroy(session->spare);
+        session->spare = NULL;
+    }
     if (session->mplx) {
         h2_mplx_release_and_join(session->mplx, session->iowait);
         session->mplx = NULL;
     }
+}
+
+void h2_session_destroy(h2_session *session)
+{
+    AP_DEBUG_ASSERT(session);
+    h2_session_cleanup(session);
+    
     if (session->streams) {
-        if (h2_stream_set_size(session->streams)) {
+        if (!h2_stream_set_is_empty(session->streams)) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
                           "h2_session(%ld): destroy, %d streams open",
                           session->id, (int)h2_stream_set_size(session->streams));
@@ -777,18 +723,18 @@ void h2_session_destroy(h2_session *sess
         h2_stream_set_destroy(session->streams);
         session->streams = NULL;
     }
-    if (session->ngh2) {
-        nghttp2_session_del(session->ngh2);
-        session->ngh2 = NULL;
+    if (session->pool) {
+        apr_pool_destroy(session->pool);
     }
 }
 
-void h2_session_cleanup(h2_session *session)
+
+void h2_session_eoc_callback(h2_session *session)
 {
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                  "session(%ld): cleanup and destroy", session->id);
+    apr_pool_cleanup_kill(session->pool, session, session_pool_cleanup);
     h2_session_destroy(session);
-    if (session->pool) {
-        apr_pool_destroy(session->pool);
-    }
 }
 
 static apr_status_t h2_session_abort_int(h2_session *session, int reason)
@@ -824,7 +770,6 @@ static apr_status_t h2_session_abort_int
                                       strlen(err));
                 nghttp2_session_send(session->ngh2);
             }
-            h2_conn_io_flush(&session->io);
         }
         h2_mplx_abort(session->mplx);
     }
@@ -863,6 +808,8 @@ apr_status_t h2_session_start(h2_session
     apr_status_t status = APR_SUCCESS;
     h2_config *config;
     nghttp2_settings_entry settings[3];
+    size_t slen;
+    int i;
     
     AP_DEBUG_ASSERT(session);
     /* Start the conversation by submitting our SETTINGS frame */
@@ -906,8 +853,8 @@ apr_status_t h2_session_start(h2_session
         }
         
         /* Now we need to auto-open stream 1 for the request we got. */
-        *rv = stream_open(session, 1);
-        if (*rv != 0) {
+        stream = h2_session_open_stream(session, 1);
+        if (!stream) {
             status = APR_EGENERAL;
             ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
                           APLOGNO(02933) "open stream 1: %s", 
@@ -915,34 +862,29 @@ apr_status_t h2_session_start(h2_session
             return status;
         }
         
-        stream = h2_stream_set_get(session->streams, 1);
-        if (stream == NULL) {
-            status = APR_EGENERAL;
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
-                          APLOGNO(02934) "lookup of stream 1");
-            return status;
-        }
-        
-        status = h2_stream_rwrite(stream, session->r);
+        status = h2_stream_set_request(stream, session->r);
         if (status != APR_SUCCESS) {
             return status;
         }
-        status = stream_end_headers(session, stream, 1);
+        status = stream_schedule(session, stream, 1);
         if (status != APR_SUCCESS) {
             return status;
         }
     }
 
-    settings[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
-    settings[0].value = (uint32_t)session->max_stream_count;
-    settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
-    settings[1].value = h2_config_geti(config, H2_CONF_WIN_SIZE);
-    settings[2].settings_id = NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE;
-    settings[2].value = 64*1024;
+    slen = 0;
+    settings[slen].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
+    settings[slen].value = (uint32_t)session->max_stream_count;
+    ++slen;
+    i = h2_config_geti(config, H2_CONF_WIN_SIZE);
+    if (i != H2_INITIAL_WINDOW_SIZE) {
+        settings[slen].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+        settings[slen].value = i;
+        ++slen;
+    }
     
     *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE,
-                                 settings,
-                                 sizeof(settings)/sizeof(settings[0]));
+                                  settings, slen);
     if (*rv != 0) {
         status = APR_EGENERAL;
         ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
@@ -953,11 +895,6 @@ apr_status_t h2_session_start(h2_session
     return status;
 }
 
-static int h2_session_want_write(h2_session *session)
-{
-    return nghttp2_session_want_write(session->ngh2);
-}
-
 typedef struct {
     h2_session *session;
     int resume_count;
@@ -1003,102 +940,18 @@ static int h2_session_resume_streams_wit
     return 0;
 }
 
-static void update_window(void *ctx, int stream_id, apr_size_t bytes_read)
+static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
 {
     h2_session *session = (h2_session*)ctx;
     nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
 }
 
-static apr_status_t h2_session_flush(h2_session *session) 
-{
-    session->flush = 0;
-    return h2_conn_io_flush(&session->io);
-}
-
-static apr_status_t h2_session_update_windows(h2_session *session)
-{
-    return h2_mplx_in_update_windows(session->mplx, update_window, session);
-}
-
-apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout)
-{
-    apr_status_t status = APR_EAGAIN;
-    h2_stream *stream = NULL;
-    
-    AP_DEBUG_ASSERT(session);
-    
-    if (session->reprioritize) {
-        h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
-        session->reprioritize = 0;
-    }
-    
-    /* Check that any pending window updates are sent. */
-    status = h2_session_update_windows(session);
-    if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
-        return status;
-    }
-    
-    if (h2_session_want_write(session)) {
-        int rv;
-        status = APR_SUCCESS;
-        rv = nghttp2_session_send(session->ngh2);
-        if (rv != 0) {
-            ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session: send: %s", nghttp2_strerror(rv));
-            if (nghttp2_is_fatal(rv)) {
-                h2_session_abort_int(session, rv);
-                status = APR_ECONNABORTED;
-            }
-        }
-    }
-    
-    /* If we have responses ready, submit them now. */
-    while (!session->aborted 
-           && (stream = h2_mplx_next_submit(session->mplx, session->streams)) != NULL) {
-        status = h2_session_handle_response(session, stream);
-    }
-    
-    if (!session->aborted && h2_session_resume_streams_with_data(session) > 0) {
-    }
-    
-    if (!session->aborted && !session->flush && timeout > 0 
-        && !h2_session_want_write(session)) {
-        h2_session_flush(session);
-        status = h2_mplx_out_trywait(session->mplx, timeout, session->iowait);
-
-        if (status != APR_TIMEUP
-            && h2_session_resume_streams_with_data(session) > 0) {
-        }
-        else {
-            /* nothing happened to ongoing streams, do some house-keeping */
-        }
-    }
-    
-    if (h2_session_want_write(session)) {
-        int rv;
-        status = APR_SUCCESS;
-        rv = nghttp2_session_send(session->ngh2);
-        if (rv != 0) {
-            ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                          "h2_session: send2: %s", nghttp2_strerror(rv));
-            if (nghttp2_is_fatal(rv)) {
-                h2_session_abort_int(session, rv);
-                status = APR_ECONNABORTED;
-            }
-        }
-    }
-    
-    if (session->flush) {
-        h2_session_flush(session);
-    }
-    
-    return status;
-}
-
 h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
 {
-    AP_DEBUG_ASSERT(session);
-    return h2_stream_set_get(session->streams, stream_id);
+    if (!session->last_stream || stream_id != session->last_stream->id) {
+        session->last_stream = h2_stream_set_get(session->streams, stream_id);
+    }
+    return session->last_stream;
 }
 
 /* h2_io_on_read_cb implementation that offers the data read
@@ -1131,17 +984,6 @@ static apr_status_t session_receive(cons
     return APR_SUCCESS;
 }
 
-apr_status_t h2_session_read(h2_session *session, apr_read_type_e block)
-{
-    AP_DEBUG_ASSERT(session);
-    if (block == APR_BLOCK_READ) {
-        /* before we do a blocking read, make sure that all our output
-         * is send out. Otherwise we might deadlock. */
-        h2_session_flush(session);
-    }
-    return h2_conn_io_read(&session->io, block, session_receive, session);
-}
-
 apr_status_t h2_session_close(h2_session *session)
 {
     AP_DEBUG_ASSERT(session);
@@ -1150,10 +992,11 @@ apr_status_t h2_session_close(h2_session
     }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
                   "h2_session: closing, writing eoc");
-    h2_conn_io_writeb(&session->io,
-                      h2_bucket_eoc_create(session->c->bucket_alloc, 
-                                           session));
-    return h2_conn_io_flush(&session->io);
+    
+    h2_session_cleanup(session);              
+    return h2_conn_io_writeb(&session->io,
+                             h2_bucket_eoc_create(session->c->bucket_alloc, 
+                                                  session));
 }
 
 static ssize_t stream_data_cb(nghttp2_session *ng2s,
@@ -1165,7 +1008,7 @@ static ssize_t stream_data_cb(nghttp2_se
                               void *puser)
 {
     h2_session *session = (h2_session *)puser;
-    apr_size_t nread = length;
+    apr_off_t nread = length;
     int eos = 0;
     apr_status_t status;
     h2_stream *stream;
@@ -1183,7 +1026,7 @@ static ssize_t stream_data_cb(nghttp2_se
     (void)ng2s;
     (void)buf;
     (void)source;
-    stream = h2_stream_set_get(session->streams, stream_id);
+    stream = h2_session_get_stream(session, stream_id);
     if (!stream) {
         ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
                       APLOGNO(02937) 
@@ -1245,42 +1088,12 @@ typedef struct {
     size_t offset;
 } nvctx_t;
 
-static int submit_response(h2_session *session, h2_response *response)
-{
-    nghttp2_data_provider provider;
-    int rv;
-    
-    memset(&provider, 0, sizeof(provider));
-    provider.source.fd = response->stream_id;
-    provider.read_callback = stream_data_cb;
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
-                  "h2_stream(%ld-%d): submitting response %s",
-                  session->id, response->stream_id, response->status);
-    
-    rv = nghttp2_submit_response(session->ngh2, response->stream_id,
-                                 response->ngheader->nv, 
-                                 response->ngheader->nvlen, &provider);
-    
-    if (rv != 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
-                      APLOGNO(02939) "h2_stream(%ld-%d): submit_response: %s",
-                      session->id, response->stream_id, nghttp2_strerror(rv));
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): submitted response %s, rv=%d",
-                      session->id, response->stream_id, 
-                      response->status, rv);
-    }
-    return rv;
-}
-
-/* Start submitting the response to a stream request. This is possible
+/**
+ * Start submitting the response to a stream request. This is possible
  * once we have all the response headers. The response body will be
  * read by the session using the callback we supply.
  */
-apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream)
+static apr_status_t submit_response(h2_session *session, h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
     int rv = 0;
@@ -1288,15 +1101,62 @@ apr_status_t h2_session_handle_response(
     AP_DEBUG_ASSERT(stream);
     AP_DEBUG_ASSERT(stream->response || stream->rst_error);
     
-    if (stream->response && stream->response->ngheader) {
-        rv = submit_response(session, stream->response);
+    if (stream->submitted) {
+        rv = NGHTTP2_PROTOCOL_ERROR;
+    }
+    else if (stream->response && stream->response->header) {
+        nghttp2_data_provider provider;
+        h2_response *response = stream->response;
+        h2_ngheader *ngh;
+        
+        memset(&provider, 0, sizeof(provider));
+        provider.source.fd = stream->id;
+        provider.read_callback = stream_data_cb;
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_stream(%ld-%d): submit response %d",
+                      session->id, stream->id, response->http_status);
+        
+        ngh = h2_util_ngheader_make_res(stream->pool, response->http_status, 
+                                        response->header);
+        rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+                                     ngh->nv, ngh->nvlen, &provider);
+        
+        /* If the submit worked,
+         * and this stream is not a pushed one itself,
+         * and HTTP/2 server push is enabled here,
+         * and the response is in the range 200-299 *),
+         * and the remote side has pushing enabled,
+         * -> find and perform any pushes on this stream
+         * 
+         * *) the response code is relevant, as we do not want to 
+         *    make pushes on 401 or 403 codes, neiterh on 301/302
+         *    and friends. And if we see a 304, we do not push either
+         *    as the client, having this resource in its cache, might
+         *    also have the pushed ones as well.
+         */
+        if (!rv 
+            && !stream->initiated_on
+            && h2_config_geti(h2_config_get(session->c), H2_CONF_PUSH)
+            && H2_HTTP_2XX(response->http_status)
+            && h2_session_push_enabled(session)) {
+            
+            h2_stream_submit_pushes(stream);
+        }
     }
     else {
+        int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+        
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_stream(%ld-%d): RST_STREAM, err=%d",
+                      session->id, stream->id, err);
+
         rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
-                                       stream->id, 
-                                       H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR));
+                                       stream->id, err);
     }
     
+    stream->submitted = 1;
+
     if (nghttp2_is_fatal(rv)) {
         status = APR_EGENERAL;
         h2_session_abort_int(session, rv);
@@ -1304,15 +1164,78 @@ apr_status_t h2_session_handle_response(
                       APLOGNO(02940) "submit_response: %s", 
                       nghttp2_strerror(rv));
     }
+    
     return status;
 }
 
+struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
+                                  h2_push *push)
+{
+    apr_status_t status;
+    h2_stream *stream;
+    h2_ngheader *ngh;
+    int nid;
+    
+    ngh = h2_util_ngheader_make_req(is->pool, push->req);
+    nid = nghttp2_submit_push_promise(session->ngh2, 0, push->initiating_id, 
+                                      ngh->nv, ngh->nvlen, NULL);
+                                      
+    if (nid <= 0) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_stream(%ld-%d): submitting push promise fail: %s",
+                      session->id, push->initiating_id, 
+                      nghttp2_strerror(nid));
+        return NULL;
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "h2_stream(%ld-%d): promised new stream %d for %s %s",
+                  session->id, push->initiating_id, nid,
+                  push->req->method, push->req->path);
+                  
+    stream = h2_session_open_stream(session, nid);
+    if (stream) {
+        h2_stream_set_h2_request(stream, is->id, push->req);
+        status = stream_schedule(session, stream, 1);
+        if (status != APR_SUCCESS) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
+                          "h2_stream(%ld-%d): scheduling push stream",
+                          session->id, stream->id);
+            h2_stream_cleanup(stream);
+            stream = NULL;
+        }
+    }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                      "h2_stream(%ld-%d): failed to create stream obj %d",
+                      session->id, push->initiating_id, nid);
+    }
+
+    if (!stream) {
+        /* try to tell the client that it should not wait. */
+        nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
+                                  NGHTTP2_INTERNAL_ERROR);
+    }
+    
+    return stream;
+}
+
 apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
 {
     apr_pool_t *pool = h2_stream_detach_pool(stream);
 
-    h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
-    h2_stream_set_remove(session->streams, stream->id);
+    /* this may be called while the session has already freed
+     * some internal structures. */
+    if (session->mplx) {
+        h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
+        if (session->last_stream == stream) {
+            session->last_stream = NULL;
+        }
+    }
+    
+    if (session->streams) {
+        h2_stream_set_remove(session->streams, stream->id);
+    }
     h2_stream_destroy(stream);
     
     if (pool) {
@@ -1325,15 +1248,6 @@ apr_status_t h2_session_stream_destroy(h
     return APR_SUCCESS;
 }
 
-int h2_session_is_done(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    return (session->aborted
-            || !session->ngh2
-            || (!nghttp2_session_want_read(session->ngh2)
-                && !nghttp2_session_want_write(session->ngh2)));
-}
-
 static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
 {
     char scratch[128];
@@ -1411,3 +1325,184 @@ static int frame_print(const nghttp2_fra
     }
 }
 
+int h2_session_push_enabled(h2_session *session)
+{
+    return nghttp2_session_get_remote_settings(session->ngh2, 
+                                               NGHTTP2_SETTINGS_ENABLE_PUSH);
+}
+
+
+apr_status_t h2_session_process(h2_session *session)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_interval_time_t wait_micros = 0;
+    static const int MAX_WAIT_MICROS = 200 * 1000;
+    int got_streams = 0;
+
+    while (!session->aborted && (nghttp2_session_want_read(session->ngh2)
+                                 || nghttp2_session_want_write(session->ngh2))) {
+        int have_written = 0;
+        int have_read = 0;
+                                 
+        /* Send data as long as we have it and window sizes allow. We are
+         * a server after all.
+         */
+        if (nghttp2_session_want_write(session->ngh2)) {
+            int rv;
+            
+            rv = nghttp2_session_send(session->ngh2);
+            if (rv != 0) {
+                ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                              "h2_session: send: %s", nghttp2_strerror(rv));
+                if (nghttp2_is_fatal(rv)) {
+                    h2_session_abort(session, status, rv);
+                    goto end_process;
+                }
+            }
+            else {
+                have_written = 1;
+                wait_micros = 0;
+            }
+        }
+        
+        if (wait_micros > 0) {
+            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c,
+                          "h2_session: wait for data, %ld micros", (long)(wait_micros));
+            h2_conn_io_pass(&session->io);
+            status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait);
+            
+            if (status == APR_TIMEUP) {
+                if (wait_micros < MAX_WAIT_MICROS) {
+                    wait_micros *= 2;
+                }
+            }
+        }
+        
+        if (nghttp2_session_want_read(session->ngh2))
+        {
+            /* When we
+             * - and have no streams at all
+             * - or have streams, but none is suspended or needs submit and
+             *   have nothing written on the last try
+             * 
+             * or, the other way around
+             * - have only streams where data can be sent, but could
+             *   not send anything
+             *
+             * then we are waiting on frames from the client (for
+             * example WINDOW_UPDATE or HEADER) and without new frames
+             * from the client, we cannot make any progress,
+             * 
+             * and *then* we can safely do a blocking read.
+             */
+            int may_block = (session->frames_received <= 1);
+            if (!may_block) {
+                if (got_streams) {
+                    may_block = (!have_written 
+                                 && !h2_stream_set_has_unsubmitted(session->streams)
+                                 && !h2_stream_set_has_suspended(session->streams));
+                }
+                else {
+                    may_block = 1;
+                }
+            }
+            
+            if (may_block) {
+                h2_conn_io_flush(&session->io);
+                if (session->c->cs) {
+                    session->c->cs->state = (got_streams? CONN_STATE_HANDLER
+                                             : CONN_STATE_WRITE_COMPLETION);
+                }
+                status = h2_conn_io_read(&session->io, APR_BLOCK_READ, 
+                                         session_receive, session);
+            }
+            else {
+                if (session->c->cs) {
+                    session->c->cs->state = CONN_STATE_HANDLER;
+                }
+                status = h2_conn_io_read(&session->io, APR_NONBLOCK_READ, 
+                                         session_receive, session);
+            }
+
+            switch (status) {
+                case APR_SUCCESS:       /* successful read, reset our idle timers */
+                    have_read = 1;
+                    wait_micros = 0;
+                    break;
+                case APR_EAGAIN:              /* non-blocking read, nothing there */
+                    break;
+                default:
+                    if (APR_STATUS_IS_ETIMEDOUT(status)
+                        || APR_STATUS_IS_ECONNABORTED(status)
+                        || APR_STATUS_IS_ECONNRESET(status)
+                        || APR_STATUS_IS_EOF(status)
+                        || APR_STATUS_IS_EBADF(status)) {
+                        /* common status for a client that has left */
+                        ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c,
+                                      "h2_session(%ld): terminating",
+                                      session->id);
+                        /* Stolen from mod_reqtimeout to speed up lingering when
+                         * a read timeout happened.
+                         */
+                        apr_table_setn(session->c->notes, "short-lingering-close", "1");
+                    }
+                    else {
+                        /* uncommon status, log on INFO so that we see this */
+                        ap_log_cerror( APLOG_MARK, APLOG_INFO, status, session->c,
+                                      APLOGNO(02950) 
+                                      "h2_session(%ld): error reading, terminating",
+                                      session->id);
+                    }
+                    h2_session_abort(session, status, 0);
+                    goto end_process;
+            }
+        }
+        
+        got_streams = !h2_stream_set_is_empty(session->streams);
+        if (got_streams) {
+            h2_stream *stream;
+            
+            if (session->reprioritize) {
+                h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+                session->reprioritize = 0;
+            }
+            
+            if (!have_read && !have_written) {
+                /* Nothing read or written. That means no data yet ready to 
+                 * be send out. Slowly back off...
+                 */
+                if (wait_micros == 0) {
+                    wait_micros = 10;
+                }
+            }
+            
+            if (h2_stream_set_has_open_input(session->streams)) {
+                /* Check that any pending window updates are sent. */
+                status = h2_mplx_in_update_windows(session->mplx, update_window, session);
+                if (APR_STATUS_IS_EAGAIN(status)) {
+                    status = APR_SUCCESS;
+                }
+                else if (status == APR_SUCCESS) {
+                    /* need to flush window updates onto the connection asap */
+                    h2_conn_io_flush(&session->io);
+                }
+            }
+            
+            h2_session_resume_streams_with_data(session);
+            
+            if (h2_stream_set_has_unsubmitted(session->streams)) {
+                /* If we have responses ready, submit them now. */
+                while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
+                    status = submit_response(session, stream);
+                }
+            }
+        }
+        
+        if (have_written) {
+            h2_conn_io_flush(&session->io);
+        }
+    }
+    
+end_process:
+    return status;
+}

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h?rev=1715218&r1=1715217&r2=1715218&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_session.h Thu Nov 19 17:14:03 2015
@@ -41,6 +41,7 @@ struct apr_thread_mutext_t;
 struct apr_thread_cond_t;
 struct h2_config;
 struct h2_mplx;
+struct h2_push;
 struct h2_response;
 struct h2_session;
 struct h2_stream;
@@ -58,7 +59,6 @@ struct h2_session {
     request_rec *r;                 /* the request that started this in case
                                      * of 'h2c', NULL otherwise */
     int aborted;                    /* this session is being aborted */
-    int flush;                      /* if != 0, flush output on next occasion */
     int reprioritize;               /* scheduled streams priority needs to 
                                      * be re-evaluated */
     apr_size_t frames_received;     /* number of http/2 frames received */
@@ -72,6 +72,7 @@ struct h2_session {
     h2_conn_io io;                  /* io on httpd conn filters */
     struct h2_mplx *mplx;           /* multiplexer for stream data */
     
+    struct h2_stream *last_stream;  /* last stream worked with */
     struct h2_stream_set *streams;  /* streams handled by this session */
     
     int max_stream_received;        /* highest stream id created */
@@ -107,6 +108,14 @@ h2_session *h2_session_rcreate(request_r
                                struct h2_workers *workers);
 
 /**
+ * Process the given HTTP/2 session until it is ended or a fatal
+ * error occured.
+ *
+ * @param session the sessionm to process
+ */
+apr_status_t h2_session_process(h2_session *session);
+
+/**
  * Destroy the session and all objects it still contains. This will not
  * destroy h2_task instances that have not finished yet. 
  * @param session the session to destroy
@@ -118,7 +127,7 @@ void h2_session_destroy(h2_session *sess
  * destroy h2_task instances that have not finished yet. 
  * @param session the session to destroy
  */
-void h2_session_cleanup(h2_session *session);
+void h2_session_eoc_callback(h2_session *session);
 
 /**
  * Called once at start of session. 
@@ -130,12 +139,6 @@ void h2_session_cleanup(h2_session *sess
 apr_status_t h2_session_start(h2_session *session, int *rv);
 
 /**
- * Determine if session is finished.
- * @return != 0 iff session is finished and connection can be closed.
- */
-int h2_session_is_done(h2_session *session);
-
-/**
  * Called when an error occured and the session needs to shut down.
  * @param session the session to shut down
  * @param reason  the apache status that caused the shutdown
@@ -145,21 +148,15 @@ int h2_session_is_done(h2_session *sessi
 apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv);
 
 /**
- * Called before a session gets destroyed, might flush output etc. 
+ * Pass any buffered output data through the connection filters.
+ * @param session the session to flush
  */
-apr_status_t h2_session_close(h2_session *session);
+apr_status_t h2_session_flush(h2_session *session);
 
-/* Read more data from the client connection. Used normally with blocking
- * APR_NONBLOCK_READ, which will return APR_EAGAIN when no data is available.
- * Use with APR_BLOCK_READ only when certain that no data needs to be written
- * while waiting. */
-apr_status_t h2_session_read(h2_session *session, apr_read_type_e block);
-
-/* Write data out to the client, if there is any. Otherwise, wait for
- * a maximum of timeout micro-seconds and return to the caller. If timeout
- * occurred, APR_TIMEUP will be returned.
+/**
+ * Called before a session gets destroyed, might flush output etc. 
  */
-apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout);
+apr_status_t h2_session_close(h2_session *session);
 
 /* Start submitting the response to a stream request. This is possible
  * once we have all the response headers. */
@@ -170,6 +167,21 @@ apr_status_t h2_session_handle_response(
 struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
 
 /**
+ * Create and register a new stream under the given id.
+ * 
+ * @param session the session to register in
+ * @param stream_id the new stream identifier
+ * @return the new stream
+ */
+struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id);
+
+/**
+ * Returns if client settings have push enabled.
+ * @param != 0 iff push is enabled in client settings
+ */
+int h2_session_push_enabled(h2_session *session);
+
+/**
  * Destroy the stream and release it everywhere. Reclaim all resources.
  * @param session the session to which the stream belongs
  * @param stream the stream to destroy
@@ -177,4 +189,16 @@ struct h2_stream *h2_session_get_stream(
 apr_status_t h2_session_stream_destroy(h2_session *session, 
                                        struct h2_stream *stream);
 
+/**
+ * Submit a push promise on the stream and schedule the new steam for
+ * processing..
+ * 
+ * @param session the session to work in
+ * @param is the stream initiating the push
+ * @param push the push to promise
+ * @return the new promised stream or NULL
+ */
+struct h2_stream *h2_session_push(h2_session *session, 
+                                  struct h2_stream *is, struct h2_push *push);
+
 #endif /* defined(__mod_h2__h2_session__) */

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c?rev=1715218&r1=1715217&r2=1715218&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.c Thu Nov 19 17:14:03 2015
@@ -25,7 +25,9 @@
 
 #include "h2_private.h"
 #include "h2_conn.h"
+#include "h2_h2.h"
 #include "h2_mplx.h"
+#include "h2_push.h"
 #include "h2_request.h"
 #include "h2_response.h"
 #include "h2_session.h"
@@ -37,38 +39,138 @@
 #include "h2_util.h"
 
 
-static void set_state(h2_stream *stream, h2_stream_state_t state)
+#define H2_STREAM_OUT(lvl,s,msg) \
+    do { \
+        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
+        h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbout); \
+    } while(0)
+#define H2_STREAM_IN(lvl,s,msg) \
+    do { \
+        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
+        h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
+    } while(0)
+    
+
+static int state_transition[][7] = {
+    /*  ID OP RL RR CI CO CL */
+/*ID*/{  1, 0, 0, 0, 0, 0, 0 },
+/*OP*/{  1, 1, 0, 0, 0, 0, 0 },
+/*RL*/{  0, 0, 1, 0, 0, 0, 0 },
+/*RR*/{  0, 0, 0, 1, 0, 0, 0 },
+/*CI*/{  1, 1, 0, 0, 1, 0, 0 },
+/*CO*/{  1, 1, 0, 0, 0, 1, 0 },
+/*CL*/{  1, 1, 0, 0, 1, 1, 1 },
+};
+
+static int set_state(h2_stream *stream, h2_stream_state_t state)
 {
-    AP_DEBUG_ASSERT(stream);
-    if (stream->state != state) {
+    int allowed = state_transition[state][stream->state];
+    if (allowed) {
         stream->state = state;
+        return 1;
+    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+                  "h2_stream(%ld-%d): invalid state transition from %d to %d", 
+                  stream->session->id, stream->id, stream->state, state);
+    return 0;
+}
+
+static int close_input(h2_stream *stream) 
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_CLOSED_INPUT:
+        case H2_STREAM_ST_CLOSED:
+            return 0; /* ignore, idempotent */
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+            /* both closed now */
+            set_state(stream, H2_STREAM_ST_CLOSED);
+            break;
+        default:
+            /* everything else we jump to here */
+            set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
+            break;
+    }
+    return 1;
+}
+
+static int input_closed(h2_stream *stream) 
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_OPEN:
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+            return 0;
+        default:
+            return 1;
+    }
+}
+
+static int close_output(h2_stream *stream) 
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+        case H2_STREAM_ST_CLOSED:
+            return 0; /* ignore, idempotent */
+        case H2_STREAM_ST_CLOSED_INPUT:
+            /* both closed now */
+            set_state(stream, H2_STREAM_ST_CLOSED);
+            break;
+        default:
+            /* everything else we jump to here */
+            set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
+            break;
+    }
+    return 1;
+}
+
+static int input_open(h2_stream *stream) 
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_OPEN:
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
+static int output_open(h2_stream *stream) 
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_OPEN:
+        case H2_STREAM_ST_CLOSED_INPUT:
+            return 1;
+        default:
+            return 0;
     }
 }
 
 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
 {
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
-    if (stream != NULL) {
-        stream->id = id;
-        stream->state = H2_STREAM_ST_IDLE;
-        stream->pool = pool;
-        stream->session = session;
-        stream->bbout = apr_brigade_create(stream->pool, 
+    stream->id        = id;
+    stream->state     = H2_STREAM_ST_IDLE;
+    stream->pool      = pool;
+    stream->session   = session;
+    return stream;
+}
+
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
+{
+    h2_stream *stream = h2_stream_create(id, pool, session);
+    set_state(stream, H2_STREAM_ST_OPEN);
+    stream->request   = h2_request_create(id, pool);
+    stream->bbout     = apr_brigade_create(stream->pool, 
                                            stream->session->c->bucket_alloc);
-        stream->request = h2_request_create(id, pool, session->c->bucket_alloc);
-        
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
-                      "h2_stream(%ld-%d): created", session->id, stream->id);
-    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "h2_stream(%ld-%d): opened", session->id, stream->id);
     return stream;
 }
 
 apr_status_t h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
-    
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
-                  "h2_stream(%ld-%d): destroy", stream->session->id, stream->id);
     if (stream->request) {
         h2_request_destroy(stream->request);
         stream->request = NULL;
@@ -96,6 +198,8 @@ apr_pool_t *h2_stream_detach_pool(h2_str
 void h2_stream_rst(h2_stream *stream, int error_code)
 {
     stream->rst_error = error_code;
+    close_input(stream);
+    close_output(stream);
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
                   "h2_stream(%ld-%d): reset, error=%d", 
                   stream->session->id, stream->id, error_code);
@@ -105,6 +209,12 @@ apr_status_t h2_stream_set_response(h2_s
                                     apr_bucket_brigade *bb)
 {
     apr_status_t status = APR_SUCCESS;
+    if (!output_open(stream)) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
+                      "h2_stream(%ld-%d): output closed", 
+                      stream->session->id, stream->id);
+        return APR_ECONNRESET;
+    }
     
     stream->response = response;
     if (bb && !APR_BRIGADE_EMPTY(bb)) {
@@ -112,40 +222,19 @@ apr_status_t h2_stream_set_response(h2_s
         /* we can move file handles from h2_mplx into this h2_stream as many
          * as we want, since the lifetimes are the same and we are not freeing
          * the ones in h2_mplx->io before this stream is done. */
-        status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all,  
+        H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_pre");
+        status = h2_util_move(stream->bbout, bb, -1, &move_all,  
                               "h2_stream_set_response");
+        H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_post");
     }
-    if (APLOGctrace1(stream->session->c)) {
-        apr_size_t len = 0;
-        int eos = 0;
-        h2_util_bb_avail(stream->bbout, &len, &eos);
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
-                      "h2_stream(%ld-%d): set_response(%s), len=%ld, eos=%d", 
-                      stream->session->id, stream->id, response->status,
-                      (long)len, (int)eos);
-    }
+    
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
+                  "h2_stream(%ld-%d): set_response(%d)", 
+                  stream->session->id, stream->id, response->http_status);
     return status;
 }
 
-static int set_closed(h2_stream *stream) 
-{
-    switch (stream->state) {
-        case H2_STREAM_ST_CLOSED_INPUT:
-        case H2_STREAM_ST_CLOSED:
-            return 0; /* ignore, idempotent */
-        case H2_STREAM_ST_CLOSED_OUTPUT:
-            /* both closed now */
-            set_state(stream, H2_STREAM_ST_CLOSED);
-            break;
-        default:
-            /* everything else we jump to here */
-            set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
-            break;
-    }
-    return 1;
-}
-
-apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r)
+apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(stream);
@@ -153,102 +242,222 @@ apr_status_t h2_stream_rwrite(h2_stream
         return APR_ECONNRESET;
     }
     set_state(stream, H2_STREAM_ST_OPEN);
-    status = h2_request_rwrite(stream->request, r, stream->session->mplx);
+    status = h2_request_rwrite(stream->request, r);
     return status;
 }
 
+void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
+                              const h2_request *req)
+{
+    h2_request_copy(stream->pool, stream->request, req);
+    stream->initiated_on = initiated_on;
+    stream->request->eoh = 0;
+}
+
+apr_status_t h2_stream_add_header(h2_stream *stream,
+                                  const char *name, size_t nlen,
+                                  const char *value, size_t vlen)
+{
+    AP_DEBUG_ASSERT(stream);
+    if (h2_stream_is_scheduled(stream)) {
+        return h2_request_add_trailer(stream->request, stream->pool,
+                                      name, nlen, value, vlen);
+    }
+    else {
+        if (!input_open(stream)) {
+            return APR_ECONNRESET;
+        }
+        return h2_request_add_header(stream->request, stream->pool,
+                                     name, nlen, value, vlen);
+    }
+}
+
 apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
                                 h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(stream);
+    AP_DEBUG_ASSERT(stream->session);
+    AP_DEBUG_ASSERT(stream->session->mplx);
     
-    if (stream->rst_error) {
+    if (!output_open(stream)) {
         return APR_ECONNRESET;
     }
+    if (stream->scheduled) {
+        return APR_EINVAL;
+    }
+    if (eos) {
+        close_input(stream);
+    }
+    
     /* Seeing the end-of-headers, we have everything we need to 
      * start processing it.
      */
-    status = h2_mplx_process(stream->session->mplx, stream->id, 
-                             stream->request, eos, cmp, ctx);
-    if (eos) {
-        set_closed(stream);
+    status = h2_request_end_headers(stream->request, stream->pool, eos);
+    if (status == APR_SUCCESS) {
+        if (!eos) {
+            stream->bbin = apr_brigade_create(stream->pool, 
+                                              stream->session->c->bucket_alloc);
+        }
+        stream->input_remaining = stream->request->content_length;
+        
+        status = h2_mplx_process(stream->session->mplx, stream->id, 
+                                 stream->request, eos, cmp, ctx);
+        stream->scheduled = 1;
     }
+    else {
+        h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+    }
+    
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
-                  "h2_mplx(%ld-%d): start stream, task %s %s (%s)",
+                  "h2_stream(%ld-%d): scheduled %s %s://%s%s",
                   stream->session->id, stream->id,
-                  stream->request->method, stream->request->path,
-                  stream->request->authority);
+                  stream->request->method, stream->request->scheme,
+                  stream->request->authority, stream->request->path);
     
     return status;
 }
 
-apr_status_t h2_stream_write_eos(h2_stream *stream)
+int h2_stream_is_scheduled(h2_stream *stream)
 {
-    AP_DEBUG_ASSERT(stream);
-    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
-                  "h2_stream(%ld-%d): closing input",
-                  stream->session->id, stream->id);
-    if (stream->rst_error) {
-        return APR_ECONNRESET;
+    return stream->scheduled;
+}
+
+static apr_status_t h2_stream_input_flush(h2_stream *stream)
+{
+    apr_status_t status = APR_SUCCESS;
+    if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
+
+        status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
+        if (status != APR_SUCCESS) {
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->mplx->c,
+                          "h2_stream(%ld-%d): flushing input data",
+                          stream->session->id, stream->id);
+        }
+    }
+    return status;
+}
+
+static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) 
+{
+    (void)bb;
+    return h2_stream_input_flush(ctx);
+}
+
+static apr_status_t input_add_data(h2_stream *stream,
+                                   const char *data, size_t len, int chunked)
+{
+    apr_status_t status = APR_SUCCESS;
+    
+    if (chunked) {
+        status = apr_brigade_printf(stream->bbin, input_flush, stream,
+                                    "%lx\r\n", (unsigned long)len);
+        if (status == APR_SUCCESS) {
+            status = apr_brigade_write(stream->bbin, input_flush, stream, data, len);
+            if (status == APR_SUCCESS) {
+                status = apr_brigade_puts(stream->bbin, input_flush, stream, "\r\n");
+            }
+        }
     }
-    if (set_closed(stream)) {
-        return h2_request_close(stream->request);
+    else {
+        status = apr_brigade_write(stream->bbin, input_flush, stream, data, len);
     }
-    return APR_SUCCESS;
+    return status;
 }
 
-apr_status_t h2_stream_write_header(h2_stream *stream,
-                                    const char *name, size_t nlen,
-                                    const char *value, size_t vlen)
+apr_status_t h2_stream_close_input(h2_stream *stream)
 {
+    apr_status_t status = APR_SUCCESS;
+    
     AP_DEBUG_ASSERT(stream);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
+                  "h2_stream(%ld-%d): closing input",
+                  stream->session->id, stream->id);
+                  
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
-    switch (stream->state) {
-        case H2_STREAM_ST_IDLE:
-            set_state(stream, H2_STREAM_ST_OPEN);
-            break;
-        case H2_STREAM_ST_OPEN:
-            break;
-        default:
-            return APR_EINVAL;
+    
+    H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
+    if (close_input(stream) && stream->bbin) {
+        if (stream->request->chunked) {
+            status = input_add_data(stream, "0\r\n\r\n", 5, 0);
+        }
+        
+        if (status == APR_SUCCESS) {
+            status = h2_stream_input_flush(stream);
+        }
+        if (status == APR_SUCCESS) {
+            status = h2_mplx_in_close(stream->session->mplx, stream->id);
+        }
     }
-    return h2_request_write_header(stream->request, name, nlen,
-                                   value, vlen, stream->session->mplx);
+    H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
+    return status;
 }
 
 apr_status_t h2_stream_write_data(h2_stream *stream,
                                   const char *data, size_t len)
 {
+    apr_status_t status = APR_SUCCESS;
+    
     AP_DEBUG_ASSERT(stream);
-    if (stream->rst_error) {
-        return APR_ECONNRESET;
+    if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
+                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", 
+                      stream->session->id, stream->id, input_closed(stream),
+                      stream->request->eoh, !!stream->bbin);
+        return APR_EINVAL;
     }
-    switch (stream->state) {
-        case H2_STREAM_ST_OPEN:
-            break;
-        default:
-            return APR_EINVAL;
+
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
+                  "h2_stream(%ld-%d): add %ld input bytes", 
+                  stream->session->id, stream->id, (long)len);
+
+    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
+    if (stream->request->chunked) {
+        /* if input may have a body and we have not seen any
+         * content-length header, we need to chunk the input data.
+         */
+        status = input_add_data(stream, data, len, 1);
+    }
+    else {
+        stream->input_remaining -= len;
+        if (stream->input_remaining < 0) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
+                          APLOGNO(02961) 
+                          "h2_stream(%ld-%d): got %ld more content bytes than announced "
+                          "in content-length header: %ld", 
+                          stream->session->id, stream->id,
+                          (long)stream->request->content_length, 
+                          -(long)stream->input_remaining);
+            h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
+            return APR_ECONNABORTED;
+        }
+        status = input_add_data(stream, data, len, 0);
+    }
+    if (status == APR_SUCCESS) {
+        status = h2_stream_input_flush(stream);
     }
-    return h2_request_write_data(stream->request, data, len);
+    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
+    return status;
 }
 
 apr_status_t h2_stream_prep_read(h2_stream *stream, 
-                                 apr_size_t *plen, int *peos)
+                                 apr_off_t *plen, int *peos)
 {
     apr_status_t status = APR_SUCCESS;
     const char *src;
+    int test_read = (*plen == 0);
     
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
 
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_pre");
     if (!APR_BRIGADE_EMPTY(stream->bbout)) {
         src = "stream";
         status = h2_util_bb_avail(stream->bbout, plen, peos);
-        if (status == APR_SUCCESS && !*peos && !*plen) {
+        if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
             apr_brigade_cleanup(stream->bbout);
             return h2_stream_prep_read(stream, plen, peos);
         }
@@ -258,9 +467,10 @@ apr_status_t h2_stream_prep_read(h2_stre
         status = h2_mplx_out_readx(stream->session->mplx, stream->id, 
                                    NULL, NULL, plen, peos);
     }
-    if (status == APR_SUCCESS && !*peos && !*plen) {
+    if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
         status = APR_EAGAIN;
     }
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_post");
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
                   "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d",
                   stream->session->id, stream->id, src, (long)*plen, *peos);
@@ -269,17 +479,18 @@ apr_status_t h2_stream_prep_read(h2_stre
 
 apr_status_t h2_stream_readx(h2_stream *stream, 
                              h2_io_data_cb *cb, void *ctx,
-                             apr_size_t *plen, int *peos)
+                             apr_off_t *plen, int *peos)
 {
     apr_status_t status = APR_SUCCESS;
     const char *src;
     
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_pre");
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
     *peos = 0;
     if (!APR_BRIGADE_EMPTY(stream->bbout)) {
-        apr_size_t origlen = *plen;
+        apr_off_t origlen = *plen;
         
         src = "stream";
         status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
@@ -299,23 +510,27 @@ apr_status_t h2_stream_readx(h2_stream *
         status = APR_EAGAIN;
     }
     
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_readx_post");
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
                   "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
                   stream->session->id, stream->id, src, (long)*plen, *peos);
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post");
+    
     return status;
 }
 
 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
-                               apr_size_t *plen, int *peos)
+                               apr_off_t *plen, int *peos)
 {
     apr_status_t status = APR_SUCCESS;
 
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_pre");
     if (stream->rst_error) {
         return APR_ECONNRESET;
     }
     
     if (APR_BRIGADE_EMPTY(stream->bbout)) {
-        apr_size_t tlen = *plen;
+        apr_off_t tlen = *plen;
         int eos;
         status = h2_mplx_out_read_to(stream->session->mplx, stream->id, 
                                      stream->bbout, &tlen, &eos);
@@ -333,6 +548,7 @@ apr_status_t h2_stream_read_to(h2_stream
     if (status == APR_SUCCESS && !*peos && !*plen) {
         status = APR_EAGAIN;
     }
+    H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_post");
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
                   "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
                   stream->session->id, stream->id, (long)*plen, *peos);
@@ -343,6 +559,9 @@ void h2_stream_set_suspended(h2_stream *
 {
     AP_DEBUG_ASSERT(stream);
     stream->suspended = !!suspended;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+                  "h2_stream(%ld-%d): suspended=%d",
+                  stream->session->id, stream->id, stream->suspended);
 }
 
 int h2_stream_is_suspended(h2_stream *stream)
@@ -351,3 +570,43 @@ int h2_stream_is_suspended(h2_stream *st
     return stream->suspended;
 }
 
+int h2_stream_input_is_open(h2_stream *stream) 
+{
+    return input_open(stream);
+}
+
+int h2_stream_needs_submit(h2_stream *stream)
+{
+    switch (stream->state) {
+        case H2_STREAM_ST_OPEN:
+        case H2_STREAM_ST_CLOSED_INPUT:
+        case H2_STREAM_ST_CLOSED_OUTPUT:
+        case H2_STREAM_ST_CLOSED:
+            return !stream->submitted;
+        default:
+            return 0;
+    }
+}
+
+apr_status_t h2_stream_submit_pushes(h2_stream *stream)
+{
+    apr_status_t status = APR_SUCCESS;
+    apr_array_header_t *pushes;
+    int i;
+    
+    pushes = h2_push_collect(stream->pool, stream->request, stream->response);
+    if (pushes && !apr_is_empty_array(pushes)) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
+                      "h2_stream(%ld-%d): found %d push candidates",
+                      stream->session->id, stream->id, pushes->nelts);
+        for (i = 0; i < pushes->nelts; ++i) {
+            h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
+            h2_stream *s = h2_session_push(stream->session, stream, push);
+            if (!s) {
+                status = APR_ECONNRESET;
+                break;
+            }
+        }
+    }
+    return status;
+}

Modified: httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h
URL: http://svn.apache.org/viewvc/httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h?rev=1715218&r1=1715217&r2=1715218&view=diff
==============================================================================
--- httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h (original)
+++ httpd/httpd/branches/2.4-http2-alpha/modules/http2/h2_stream.h Thu Nov 19 17:14:03 2015
@@ -19,19 +19,14 @@
 /**
  * A HTTP/2 stream, e.g. a client request+response in HTTP/1.1 terms.
  * 
- * Ok, not quite, but close enough, since we do not implement server
- * pushes yet.
- *
  * A stream always belongs to a h2_session, the one managing the
  * connection to the client. The h2_session writes to the h2_stream,
  * adding HEADERS and DATA and finally an EOS. When headers are done,
- * h2_stream can create a h2_task that can be scheduled to fullfill the
- * request.
+ * h2_stream is scheduled for handling, which is expected to produce
+ * a h2_response.
  * 
- * This response headers are added directly to the h2_mplx of the session,
- * but the response DATA can be read via h2_stream. Reading data will
- * never block but return APR_EAGAIN when there currently is no data (and
- * no eos) in the multiplexer for this stream.
+ * The h2_response gives the HEADER frames to sent to the client, followed
+ * by DATA frames read from the h2_stream until EOS is reached.
  */
 #include "h2_io.h"
 
@@ -55,64 +50,244 @@ typedef struct h2_stream h2_stream;
 
 struct h2_stream {
     int id;                     /* http2 stream id */
+    int initiated_on;           /* http2 stream id this was initiated on or 0 */
     h2_stream_state_t state;    /* http/2 state of this stream */
     struct h2_session *session; /* the session this stream belongs to */
     
+    apr_pool_t *pool;           /* the memory pool for this stream */
+    struct h2_request *request; /* the request made in this stream */
+    struct h2_response *response; /* the response, once ready */
+    
     int aborted;                /* was aborted */
     int suspended;              /* DATA sending has been suspended */
     int rst_error;              /* stream error for RST_STREAM */
+    int scheduled;              /* stream has been scheduled */
+    int submitted;              /* response HEADER has been sent */
     
-    apr_pool_t *pool;           /* the memory pool for this stream */
-    struct h2_request *request; /* the request made in this stream */
-    
-    struct h2_response *response; /* the response, once ready */
+    apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
+    apr_bucket_brigade *bbin;   /* input DATA */
     
     apr_bucket_brigade *bbout;  /* output DATA */
-    apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */
+    apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
 };
 
 
 #define H2_STREAM_RST(s, def)    (s->rst_error? s->rst_error : (def))
 
+/**
+ * Create a stream in IDLE state.
+ * @param id      the stream identifier
+ * @param pool    the memory pool to use for this stream
+ * @param session the session this stream belongs to
+ * @return the newly created IDLE stream
+ */
 h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
 
+/**
+ * Create a stream in OPEN state.
+ * @param id      the stream identifier
+ * @param pool    the memory pool to use for this stream
+ * @param session the session this stream belongs to
+ * @return the newly opened stream
+ */
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session);
+
+/**
+ * Destroy any resources held by this stream. Will destroy memory pool
+ * if still owned by the stream.
+ *
+ * @param stream the stream to destroy
+ */
 apr_status_t h2_stream_destroy(h2_stream *stream);
 
+/**
+ * Removes stream from h2_session and destroys it.
+ *
+ * @param stream the stream to cleanup
+ */
 void h2_stream_cleanup(h2_stream *stream);
 
-void h2_stream_rst(h2_stream *streamm, int error_code);
-
+/**
+ * Detach the memory pool from the stream. Will prevent stream
+ * destruction to take the pool with it.
+ *
+ * @param stream the stream to detach the pool from
+ * @param the detached memmory pool or NULL if stream no longer has one
+ */
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
 
-apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r);
 
-apr_status_t h2_stream_write_eos(h2_stream *stream);
+/**
+ * Initialize stream->request with the given request_rec.
+ * 
+ * @param stream stream to write request to
+ * @param r the request with all the meta data
+ */
+apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r);
 
-apr_status_t h2_stream_write_header(h2_stream *stream,
-                                    const char *name, size_t nlen,
-                                    const char *value, size_t vlen);
+/**
+ * Initialize stream->request with the given h2_request.
+ * 
+ * @param stream the stream to init the request for
+ * @param req the request for initializing, will be copied
+ */
+void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
+                              const struct h2_request *req);
 
-apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
-                                h2_stream_pri_cmp *cmp, void *ctx);
+/*
+ * Add a HTTP/2 header (including pseudo headers) or trailer 
+ * to the given stream, depending on stream state.
+ *
+ * @param stream stream to write the header to
+ * @param name the name of the HTTP/2 header
+ * @param nlen the number of characters in name
+ * @param value the header value
+ * @param vlen the number of characters in value
+ */
+apr_status_t h2_stream_add_header(h2_stream *stream,
+                                  const char *name, size_t nlen,
+                                  const char *value, size_t vlen);
+
+/**
+ * Closes the stream's input.
+ *
+ * @param stream stream to close intput of
+ */
+apr_status_t h2_stream_close_input(h2_stream *stream);
 
+/*
+ * Write a chunk of DATA to the stream.
+ *
+ * @param stream stream to write the data to
+ * @param data the beginning of the bytes to write
+ * @param len the number of bytes to write
+ */
 apr_status_t h2_stream_write_data(h2_stream *stream,
                                   const char *data, size_t len);
 
+/**
+ * Reset the stream. Stream write/reads will return errors afterwards.
+ *
+ * @param stream the stream to reset
+ * @param error_code the HTTP/2 error code
+ */
+void h2_stream_rst(h2_stream *streamm, int error_code);
+
+/**
+ * Schedule the stream for execution. All header information must be
+ * present. Use the given priority comparision callback to determine 
+ * order in queued streams.
+ * 
+ * @param stream the stream to schedule
+ * @param eos    != 0 iff no more input will arrive
+ * @param cmp    priority comparision
+ * @param ctx    context for comparision
+ */
+apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
+                                h2_stream_pri_cmp *cmp, void *ctx);
+
+/**
+ * Determine if stream has been scheduled already.
+ * @param stream the stream to check on
+ * @return != 0 iff stream has been scheduled
+ */
+int h2_stream_is_scheduled(h2_stream *stream);
+
+/**
+ * Set the response for this stream. Invoked when all meta data for
+ * the stream response has been collected.
+ * 
+ * @param stream the stream to set the response for
+ * @param resonse the response data for the stream
+ * @param bb bucket brigade with output data for the stream. Optional,
+ *        may be incomplete.
+ */
 apr_status_t h2_stream_set_response(h2_stream *stream, 
                                     struct h2_response *response,
                                     apr_bucket_brigade *bb);
 
+/**
+ * Do a speculative read on the stream output to determine the 
+ * amount of data that can be read.
+ * 
+ * @param stream the stream to speculatively read from
+ * @param plen (in-/out) number of bytes requested and on return amount of bytes that
+ *        may be read without blocking
+ * @param peos (out) != 0 iff end of stream will be reached when reading plen
+ *        bytes (out value).
+ * @return APR_SUCCESS if out information was computed successfully.
+ *         APR_EAGAIN if not data is available and end of stream has not been
+ *         reached yet.
+ */
 apr_status_t h2_stream_prep_read(h2_stream *stream, 
-                                 apr_size_t *plen, int *peos);
+                                 apr_off_t *plen, int *peos);
 
+/**
+ * Read data from the stream output.
+ * 
+ * @param stream the stream to read from
+ * @param cb callback to invoke for byte chunks read. Might be invoked
+ *        multiple times (with different values) for one read operation.
+ * @param ctx context data for callback
+ * @param plen (in-/out) max. number of bytes to read and on return actual
+ *        number of bytes read
+ * @param peos (out) != 0 iff end of stream has been reached while reading
+ * @return APR_SUCCESS if out information was computed successfully.
+ *         APR_EAGAIN if not data is available and end of stream has not been
+ *         reached yet.
+ */
 apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, 
-                             void *ctx, apr_size_t *plen, int *peos);
+                             void *ctx, apr_off_t *plen, int *peos);
 
+/**
+ * Read a maximum number of bytes into the bucket brigade.
+ * 
+ * @param stream the stream to read from
+ * @param bb the brigade to append output to
+ * @param plen (in-/out) max. number of bytes to append and on return actual
+ *        number of bytes appended to brigade
+ * @param peos (out) != 0 iff end of stream has been reached while reading
+ * @return APR_SUCCESS if out information was computed successfully.
+ *         APR_EAGAIN if not data is available and end of stream has not been
+ *         reached yet.
+ */
 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
-                               apr_size_t *plen, int *peos);
-
+                               apr_off_t *plen, int *peos);
 
+/**
+ * Set the suspended state of the stream.
+ * @param stream the stream to change state on
+ * @param suspended boolean value if stream is suspended
+ */
 void h2_stream_set_suspended(h2_stream *stream, int suspended);
+
+/**
+ * Check if the stream has been suspended.
+ * @param stream the stream to check
+ * @return != 0 iff stream is suspended.
+ */
 int h2_stream_is_suspended(h2_stream *stream);
 
+/**
+ * Check if the stream has open input.
+ * @param stream the stream to check
+ * @return != 0 iff stream has open input.
+ */
+int h2_stream_input_is_open(h2_stream *stream);
+
+/**
+ * Check if the stream has not submitted a response or RST yet.
+ * @param stream the stream to check
+ * @return != 0 iff stream has not submitted a response or RST.
+ */
+int h2_stream_needs_submit(h2_stream *stream);
+
+/**
+ * Submit any server push promises on this stream and schedule
+ * the tasks connection with these.
+ *
+ * @param stream the stream for which to submit
+ */
+apr_status_t h2_stream_submit_pushes(h2_stream *stream);
+
 #endif /* defined(__mod_h2__h2_stream__) */