You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2023/06/20 12:01:09 UTC

svn commit: r1910507 [2/2] - in /httpd/httpd/trunk: ./ .github/workflows/ changes-entries/ docs/manual/mod/ include/ modules/http2/ modules/proxy/ server/ test/ test/clients/ test/modules/http2/ test/pyhttpd/

Added: httpd/httpd/trunk/test/clients/h2ws.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/clients/h2ws.c?rev=1910507&view=auto
==============================================================================
--- httpd/httpd/trunk/test/clients/h2ws.c (added)
+++ httpd/httpd/trunk/test/clients/h2ws.c Tue Jun 20 12:01:09 2023
@@ -0,0 +1,1096 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <apr.h>
+
+#include <assert.h>
+#include <inttypes.h>
+#include <stdlib.h>
+#ifdef APR_HAVE_UNISTD_H
+#  include <unistd.h>
+#endif /* HAVE_UNISTD_H */
+#ifdef APR_HAVE_FCNTL_H
+#  include <fcntl.h>
+#endif /* HAVE_FCNTL_H */
+#include <sys/types.h>
+#include <sys/time.h>
+#ifdef APR_HAVE_SYS_SOCKET_H
+#  include <sys/socket.h>
+#endif /* HAVE_SYS_SOCKET_H */
+#ifdef APR_HAVE_NETDB_H
+#  include <netdb.h>
+#endif /* HAVE_NETDB_H */
+#ifdef APR_HAVE_NETINET_IN_H
+#  include <netinet/in.h>
+#endif /* HAVE_NETINET_IN_H */
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+
+#include <nghttp2/nghttp2.h>
+
+#define MAKE_NV(NAME, VALUE)                                                   \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, sizeof(VALUE) - 1,    \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+#define MAKE_NV_CS(NAME, VALUE)                                                \
+  {                                                                            \
+    (uint8_t *)NAME, (uint8_t *)VALUE, sizeof(NAME) - 1, strlen(VALUE),        \
+        NGHTTP2_NV_FLAG_NONE                                                   \
+  }
+
+
+static int verbose;
+static const char *cmd;
+
+static void log_out(const char *level, const char *where, const char *msg)
+{
+    struct timespec tp;
+    struct tm tm;
+    char timebuf[128];
+
+    clock_gettime(CLOCK_REALTIME, &tp);
+    localtime_r(&tp.tv_sec, &tm);
+    strftime(timebuf, sizeof(timebuf)-1, "%H:%M:%S", &tm);
+    fprintf(stderr, "[%s.%09lu][%s][%s] %s\n", timebuf, tp.tv_nsec, level, where, msg);
+}
+
+static void log_err(const char *where, const char *msg)
+{
+    log_out("ERROR", where, msg);
+}
+
+static void log_info(const char *where, const char *msg)
+{
+    if (verbose)
+        log_out("INFO", where, msg);
+}
+
+static void log_debug(const char *where, const char *msg)
+{
+    if (verbose > 1)
+        log_out("DEBUG", where, msg);
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_errf(const char *where, const char *msg, ...)
+{
+    char buffer[8*1024];
+    va_list ap;
+
+    va_start(ap, msg);
+    vsnprintf(buffer, sizeof(buffer), msg, ap);
+    va_end(ap);
+    log_err(where, buffer);
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_infof(const char *where, const char *msg, ...)
+{
+    if (verbose) {
+        char buffer[8*1024];
+        va_list ap;
+
+        va_start(ap, msg);
+        vsnprintf(buffer, sizeof(buffer), msg, ap);
+        va_end(ap);
+        log_info(where, buffer);
+    }
+}
+
+#if defined(__GNUC__)
+    __attribute__((format(printf, 2, 3)))
+#endif
+static void log_debugf(const char *where, const char *msg, ...)
+{
+    if (verbose > 1) {
+        char buffer[8*1024];
+        va_list ap;
+
+        va_start(ap, msg);
+        vsnprintf(buffer, sizeof(buffer), msg, ap);
+        va_end(ap);
+        log_debug(where, buffer);
+    }
+}
+
+static int parse_host_port(const char **phost, uint16_t *pport,
+                           int *pipv6, size_t *pconsumed,
+                           const char *s, size_t len, uint16_t def_port)
+{
+    size_t i, offset;
+    char *host = NULL;
+    int port = 0;
+    int rv = 1, ipv6 = 0;
+
+    if (!len)
+        goto leave;
+    offset = 0;
+    if (s[offset] == '[') {
+        ipv6 = 1;
+        for (i = offset++; i < len; ++i) {
+            if (s[i] == ']')
+              break;
+        }
+        if (i >= len || i == offset)
+            goto leave;
+        host = strndup(s + offset, i - offset);
+        offset = i + 1;
+    }
+    else {
+        for (i = offset; i < len; ++i) {
+            if (strchr(":/?#", s[i]))
+              break;
+        }
+        if (i == offset) {
+            log_debugf("parse_uri", "empty host name in '%.*s", (int)len, s);
+            goto leave;
+        }
+        host = strndup(s + offset, i - offset);
+        offset = i;
+    }
+    if (offset < len && s[offset] == ':') {
+        port = 0;
+        ++offset;
+        for (i = offset; i < len; ++i) {
+            if (strchr("/?#", s[i]))
+                break;
+            if (s[i] < '0' || s[i] > '9') {
+                log_debugf("parse_uri", "invalid port char '%c'", s[i]);
+                goto leave;
+            }
+            port *= 10;
+            port += s[i] - '0';
+            if (port > 65535) {
+                log_debugf("parse_uri", "invalid port number '%d'", port);
+                goto leave;
+            }
+        }
+        offset = i;
+    }
+    rv = 0;
+
+leave:
+    *phost = rv? NULL : host;
+    *pport = rv? 0 : (port? (uint16_t)port : def_port);
+    if (pipv6)
+      *pipv6 = ipv6;
+    if (pconsumed)
+      *pconsumed = offset;
+    return rv;
+}
+
+struct uri {
+  const char *scheme;
+  const char *host;
+  const char *authority;
+  const char *path;
+  uint16_t port;
+  int ipv6;
+};
+
+static int parse_uri(struct uri *uri, const char *s, size_t len)
+{
+    char tmp[8192];
+    size_t n, offset = 0;
+    uint16_t def_port = 0;
+    int rv = 1;
+
+    /* NOT A REAL URI PARSER */
+    memset(uri, 0, sizeof(*uri));
+    if (len > 5 && !memcmp("ws://", s, 5)) {
+        uri->scheme = "ws";
+        def_port = 80;
+        offset = 5;
+    }
+    else if (len > 6 && !memcmp("wss://", s, 6)) {
+        uri->scheme = "wss";
+        def_port = 443;
+        offset = 6;
+    }
+    else {
+        /* not a scheme we process */
+        goto leave;
+    }
+
+    if (parse_host_port(&uri->host, &uri->port, &uri->ipv6, &n, s + offset,
+                        len - offset, def_port))
+        goto leave;
+    offset += n;
+
+    if (uri->port == def_port)
+      uri->authority = uri->host;
+    else if (uri->ipv6) {
+      snprintf(tmp, sizeof(tmp), "[%s]:%u", uri->host, uri->port);
+      uri->authority = strdup(tmp);
+    }
+    else {
+      snprintf(tmp, sizeof(tmp), "%s:%u", uri->host, uri->port);
+      uri->authority = strdup(tmp);
+    }
+
+    if (offset < len) {
+        uri->path = strndup(s + offset, len - offset);
+    }
+    rv = 0;
+
+leave:
+    return rv;
+}
+
+static int sock_nonblock_nodelay(int fd) {
+  int flags, rv;
+  int val = 1;
+
+  while ((flags = fcntl(fd, F_GETFL, 0)) == -1 && errno == EINTR)
+      ;
+  if (flags == -1) {
+      log_errf("sock_nonblock_nodelay", "fcntl get error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  while ((rv = fcntl(fd, F_SETFL, flags | O_NONBLOCK)) == -1 && errno == EINTR)
+    ;
+  if (rv == -1) {
+      log_errf("sock_nonblock_nodelay", "fcntl set error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  rv = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, (socklen_t)sizeof(val));
+  if (rv == -1) {
+      log_errf("sock_nonblock_nodelay", "set nodelay error %d (%s)",
+               errno, strerror(errno));
+      return -1;
+  }
+  return 0;
+}
+
+static int open_connection(const char *host, uint16_t port)
+{
+    char service[NI_MAXSERV];
+    struct addrinfo hints;
+    struct addrinfo *res = NULL, *rp;
+    int rv, fd = -1;
+
+    memset(&hints, 0, sizeof(hints));
+    snprintf(service, sizeof(service), "%u", port);
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+    rv = getaddrinfo(host, service, &hints, &res);
+    if (rv) {
+      log_err("getaddrinfo", gai_strerror(rv));
+      goto leave;
+    }
+
+    for (rp = res; rp; rp = rp->ai_next) {
+      fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+      if (fd == -1) {
+        continue;
+      }
+      while ((rv = connect(fd, rp->ai_addr, rp->ai_addrlen)) == -1 &&
+             errno == EINTR)
+        ;
+      if (!rv) /* connected */
+          break;
+      close(fd);
+      fd = -1;
+    }
+
+leave:
+    if (res)
+      freeaddrinfo(res);
+    return fd;
+}
+
+struct h2_stream;
+
+#define IO_WANT_NONE   0
+#define IO_WANT_READ   1
+#define IO_WANT_WRITE  2
+
+struct h2_session {
+    const char *server_name;
+    const char *connect_host;
+    uint16_t connect_port;
+    int fd;
+    nghttp2_session *ngh2;
+    struct h2_stream *streams;
+    int aborted;
+    int want_io;
+};
+
+typedef void h2_stream_closed_cb(struct h2_stream *stream);
+typedef void h2_stream_recv_data(struct h2_stream *stream,
+                                 const uint8_t *data, size_t len);
+
+struct h2_stream {
+    struct h2_stream *next;
+    struct uri *uri;
+    int32_t id;
+    int fdin;
+    int http_status;
+    uint32_t error_code;
+    unsigned input_closed : 1;
+    unsigned closed : 1;
+    unsigned reset : 1;
+    h2_stream_closed_cb *on_close;
+    h2_stream_recv_data *on_recv_data;
+};
+
+static void h2_session_stream_add(struct h2_session *session,
+                                  struct h2_stream *stream)
+{
+    struct h2_stream *s;
+    for (s = session->streams; s; s = s->next) {
+        if (s == stream)  /* already there? */
+            return;
+    }
+    stream->next = session->streams;
+    session->streams = stream;
+}
+
+static void h2_session_stream_remove(struct h2_session *session,
+                                     struct h2_stream *stream)
+{
+    struct h2_stream *s, **pnext;
+    pnext = &session->streams;
+    s = session->streams;
+    while (s) {
+        if (s == stream) {
+            *pnext = s->next;
+            s->next = NULL;
+            break;
+        }
+        pnext = &s->next;
+        s = s->next;
+    }
+}
+
+static struct h2_stream *h2_session_stream_get(struct h2_session *session,
+                                               int32_t id)
+{
+    struct h2_stream *s;
+    for (s = session->streams; s; s = s->next) {
+        if (s->id == id)
+            return s;
+    }
+    return NULL;
+}
+
+static ssize_t h2_session_send(nghttp2_session *ngh2, const uint8_t *data,
+                               size_t length, int flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    ssize_t nwritten;
+    (void)ngh2;
+    (void)flags;
+
+    session->want_io = IO_WANT_NONE;
+    nwritten = send(session->fd, data, length, 0);
+    if (nwritten < 0) {
+      int err = errno;
+      if ((EWOULDBLOCK == err) || (EAGAIN == err) ||
+          (EINTR == err) || (EINPROGRESS == err)) {
+          return NGHTTP2_ERR_WOULDBLOCK;
+      }
+      log_errf("h2_session_send", "error sending %ld bytes: %d (%s)",
+               (long)length, err, strerror(err));
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    return nwritten;
+}
+
+static ssize_t h2_session_recv(nghttp2_session *ngh2, uint8_t *buf,
+                               size_t length, int flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    ssize_t nread;
+    (void)ngh2;
+    (void)flags;
+
+    session->want_io = IO_WANT_NONE;
+    nread = recv(session->fd, buf, length, 0);
+    if (nread < 0) {
+      int err = errno;
+      if ((EWOULDBLOCK == err) || (EAGAIN == err) || (EINTR == err)) {
+          return NGHTTP2_ERR_WOULDBLOCK;
+      }
+      log_errf("h2_session_recv", "error reading %ld bytes: %d (%s)",
+               (long)length, err, strerror(err));
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+    return nread;
+}
+
+static int h2_session_on_frame_send(nghttp2_session *session,
+                                    const nghttp2_frame *frame,
+                                    void *user_data)
+{
+    size_t i;
+    (void)user_data;
+
+    switch (frame->hd.type) {
+    case NGHTTP2_HEADERS:
+      if (nghttp2_session_get_stream_user_data(session, frame->hd.stream_id)) {
+        const nghttp2_nv *nva = frame->headers.nva;
+        log_infof("frame send", "FRAME[HEADERS, stream=%d",
+                  frame->hd.stream_id);
+        for (i = 0; i < frame->headers.nvlen; ++i) {
+            log_infof("frame send", "  %.*s: %.*s",
+                      (int)nva[i].namelen, nva[i].name,
+                      (int)nva[i].valuelen, nva[i].value);
+        }
+        log_infof("frame send", "]");
+      }
+      break;
+    case NGHTTP2_DATA:
+        log_infof("frame send", "FRAME[DATA, stream=%d, length=%d, flags=%d]",
+                  frame->hd.stream_id, (int)frame->hd.length,
+                  (int)frame->hd.flags);
+        break;
+    case NGHTTP2_RST_STREAM:
+        log_infof("frame send", "FRAME[RST, stream=%d]",
+                  frame->hd.stream_id);
+        break;
+    case NGHTTP2_WINDOW_UPDATE:
+        log_infof("frame send", "FRAME[WINDOW_UPDATE, stream=%d]",
+                  frame->hd.stream_id);
+        break;
+    case NGHTTP2_GOAWAY:
+        log_infof("frame send", "FRAME[GOAWAY]");
+        break;
+    }
+    return 0;
+}
+
+static int h2_session_on_frame_recv(nghttp2_session *ngh2,
+                                    const nghttp2_frame *frame,
+                                    void *user_data)
+{
+    (void)user_data;
+
+    switch (frame->hd.type) {
+    case NGHTTP2_HEADERS:
+        if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
+          log_infof("frame recv", "FRAME[HEADERS, stream=%d]",
+                    frame->hd.stream_id);
+        }
+        break;
+    case NGHTTP2_DATA:
+        log_infof("frame recv", "FRAME[DATA, stream=%d, len=%lu, eof=%d]",
+                  frame->hd.stream_id, frame->hd.length,
+                  (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) != 0);
+        break;
+    case NGHTTP2_RST_STREAM:
+        log_infof("frame recv", "FRAME[RST, stream=%d]",
+                  frame->hd.stream_id);
+        fprintf(stdout, "[%d] RST\n", frame->hd.stream_id);
+        break;
+    case NGHTTP2_GOAWAY:
+        log_infof("frame recv", "FRAME[GOAWAY]");
+        break;
+    }
+    return 0;
+}
+
+static int h2_session_on_header(nghttp2_session *ngh2,
+                                const nghttp2_frame *frame,
+                                const uint8_t *name, size_t namelen,
+                                const uint8_t *value, size_t valuelen,
+                                uint8_t flags, void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+    (void)flags;
+    (void)user_data;
+    log_infof("frame recv", "stream=%d, HEADER   %.*s: %.*s",
+              frame->hd.stream_id, (int)namelen, name,
+              (int)valuelen, value);
+    stream = h2_session_stream_get(session, frame->hd.stream_id);
+    if (stream) {
+        if (namelen == 7 && !strncmp(":status", (const char *)name, namelen)) {
+            stream->http_status = 0;
+            if (valuelen < 10) {
+                char tmp[10], *endp;
+                memcpy(tmp, value, valuelen);
+                tmp[valuelen] = 0;
+                stream->http_status = (int)strtol(tmp, &endp, 10);
+            }
+            if (stream->http_status < 100 || stream->http_status >= 600) {
+                log_errf("on header recv", "stream=%d, invalid :status: %.*s",
+                          frame->hd.stream_id, (int)valuelen, value);
+                return NGHTTP2_ERR_CALLBACK_FAILURE;
+            }
+            else {
+                fprintf(stdout, "[%d] :status: %d\n", stream->id,
+                        stream->http_status);
+            }
+        }
+    }
+    return 0;
+}
+
+static int h2_session_on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
+                                      uint32_t error_code, void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+
+    stream = h2_session_stream_get(session, stream_id);
+    if (stream) {
+        /* closed known stream */
+        stream->error_code = error_code;
+        stream->closed = 1;
+        if (error_code)
+            stream->reset = 1;
+        if (error_code) {
+            log_errf("stream close", "stream %d closed with error %d",
+                     stream_id, error_code);
+        }
+
+        h2_session_stream_remove(session, stream);
+        if (stream->on_close)
+            stream->on_close(stream);
+        /* last one? */
+        if (!session->streams) {
+            int rv;
+            rv = nghttp2_session_terminate_session(ngh2, NGHTTP2_NO_ERROR);
+            if (rv) {
+                log_errf("terminate session", "error %d (%s)",
+                         rv, nghttp2_strerror(rv));
+                session->aborted = 1;
+            }
+        }
+    }
+    return 0;
+}
+
+static int h2_session_on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
+                                         int32_t stream_id, const uint8_t *data,
+                                         size_t len, void *user_data) {
+    struct h2_session *session = user_data;
+    struct h2_stream *stream;
+
+    stream = h2_session_stream_get(session, stream_id);
+    if (stream && stream->on_recv_data) {
+        stream->on_recv_data(stream, data, len);
+    }
+    return 0;
+}
+
+static int h2_session_open(struct h2_session *session, const char *server_name,
+                           const char *host, uint16_t port)
+{
+    nghttp2_session_callbacks *cbs = NULL;
+    int rv = -1;
+
+    memset(session, 0, sizeof(*session));
+    session->server_name = server_name;
+    session->connect_host = host;
+    session->connect_port = port;
+    /* establish socket */
+    session->fd = open_connection(session->connect_host, session->connect_port);
+    if (session->fd < 0) {
+      log_errf(cmd, "could not connect to %s:%u",
+               session->connect_host, session->connect_port);
+      goto leave;
+    }
+    if (sock_nonblock_nodelay(session->fd))
+        goto leave;
+    session->want_io = IO_WANT_NONE;
+
+    log_infof(cmd, "connected to %s via %s:%u", session->server_name,
+              session->connect_host, session->connect_port);
+
+    rv = nghttp2_session_callbacks_new(&cbs);
+    if (rv) {
+        log_errf("setup callbacks", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    /* setup session callbacks */
+    nghttp2_session_callbacks_set_send_callback(cbs, h2_session_send);
+    nghttp2_session_callbacks_set_recv_callback(cbs, h2_session_recv);
+    nghttp2_session_callbacks_set_on_frame_send_callback(
+        cbs, h2_session_on_frame_send);
+    nghttp2_session_callbacks_set_on_frame_recv_callback(
+        cbs, h2_session_on_frame_recv);
+    nghttp2_session_callbacks_set_on_header_callback(
+        cbs, h2_session_on_header);
+    nghttp2_session_callbacks_set_on_stream_close_callback(
+        cbs, h2_session_on_stream_close);
+    nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
+        cbs, h2_session_on_data_chunk_recv);
+    /* create the ngh2 session */
+    rv = nghttp2_session_client_new(&session->ngh2, cbs, session);
+    if (rv) {
+        log_errf("client new", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    /* submit initial settings */
+    rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, NULL, 0);
+    if (rv) {
+        log_errf("submit settings", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        rv = -1;
+        goto leave;
+    }
+    rv = 0;
+
+leave:
+    if (cbs)
+        nghttp2_session_callbacks_del(cbs);
+    return rv;
+}
+
+static int h2_session_io(struct h2_session *session) {
+    int rv;
+    rv = nghttp2_session_recv(session->ngh2);
+    if (rv) {
+        log_errf("session recv", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+        return 1;
+    }
+    rv = nghttp2_session_send(session->ngh2);
+    if (rv) {
+        log_errf("session send", "error_code=%d, msg=%s\n", rv,
+                 nghttp2_strerror(rv));
+    }
+    return 0;
+}
+
+struct h2_poll_ctx;
+typedef int h2_poll_ev_cb(struct h2_poll_ctx *pctx, struct pollfd *pfd);
+
+struct h2_poll_ctx {
+    struct h2_session *session;
+    struct h2_stream *stream;
+    h2_poll_ev_cb *on_ev;
+};
+
+static int h2_session_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+    if (pfd->revents & (POLLIN | POLLOUT)) {
+        h2_session_io(pctx->session);
+    }
+    else if (pfd->revents & POLLHUP) {
+        log_errf("session run", "connection closed");
+        return -1;
+    }
+    else if (pfd->revents & POLLERR) {
+        log_errf("session run", "connection error");
+        return -1;
+    }
+    return 0;
+}
+
+static int h2_stream_ev(struct h2_poll_ctx *pctx, struct pollfd *pfd)
+{
+    if (pfd->revents & (POLLIN | POLLHUP)) {
+        nghttp2_session_resume_data(pctx->session->ngh2, pctx->stream->id);
+    }
+    else if (pfd->revents & (POLLERR)) {
+        nghttp2_submit_rst_stream(pctx->session->ngh2, NGHTTP2_FLAG_NONE,
+                                  pctx->stream->id, NGHTTP2_STREAM_CLOSED);
+    }
+    return 0;
+}
+
+static nfds_t h2_session_set_poll(struct h2_session *session,
+                                  struct h2_poll_ctx *pollctxs,
+                                  struct pollfd *pfds)
+{
+    nfds_t n = 0;
+    int want_read, want_write;
+    struct h2_stream *stream;
+
+    want_read = (nghttp2_session_want_read(session->ngh2) ||
+                 session->want_io == IO_WANT_READ);
+    want_write = (nghttp2_session_want_write(session->ngh2) ||
+                  session->want_io == IO_WANT_WRITE);
+    if (want_read || want_write) {
+        pollctxs[n].session = session;
+        pollctxs[n].stream = NULL;
+        pollctxs[n].on_ev = h2_session_ev;
+        pfds[n].fd = session->fd;
+        pfds[n].events = pfds[n].revents = 0;
+        if (want_read)
+            pfds[n].events |= (POLLIN | POLLHUP);
+        if (want_write)
+            pfds[n].events |= (POLLOUT | POLLERR);
+        ++n;
+    }
+
+    for (stream = session->streams; stream; stream = stream->next) {
+        if (stream->fdin >= 0 && !stream->input_closed && !stream->closed) {
+            pollctxs[n].session = session;
+            pollctxs[n].stream = stream;
+            pollctxs[n].on_ev = h2_stream_ev;
+            pfds[n].fd = stream->fdin;
+            pfds[n].revents = 0;
+            pfds[n].events = (POLLIN | POLLHUP);
+            ++n;
+        }
+    }
+    return n;
+}
+
+static void h2_session_run(struct h2_session *session)
+{
+  struct h2_poll_ctx pollctxs[5];
+  struct pollfd pfds[5];
+  nfds_t npollfds, i;
+
+  npollfds  = h2_session_set_poll(session, pollctxs, pfds);
+  while (npollfds) {
+    if (poll(pfds, npollfds, -1) == -1) {
+        log_errf("session run", "poll error %d (%s)", errno, strerror(errno));
+        break;
+    }
+    for (i = 0; i < npollfds; ++i) {
+        if (pfds[i].revents) {
+            if (pollctxs[i].on_ev(&pollctxs[i], &pfds[i])) {
+                break;
+            }
+        }
+    }
+    npollfds = h2_session_set_poll(session, pollctxs, pfds);
+    if (!session->streams)
+        break;
+  }
+}
+
+static void h2_session_close(struct h2_session *session)
+{
+    log_infof(cmd, "closed session to %s:%u",
+              session->connect_host, session->connect_port);
+}
+
+/* websocket stream */
+
+struct ws_stream {
+  struct h2_stream s;
+};
+
+static void ws_stream_on_close(struct h2_stream *stream)
+{
+    log_infof("ws stream", "stream %d closed", stream->id);
+    if (!stream->reset)
+        fprintf(stdout, "[%d] EOF\n", stream->id);
+}
+
+static void ws_stream_on_recv_data(struct h2_stream *stream,
+                            const uint8_t *data, size_t len)
+{
+    size_t i;
+
+    log_infof("ws stream", "stream %d recv %lu data bytes",
+              stream->id, (unsigned long)len);
+    for (i = 0; i < len; ++i) {
+        fprintf(stdout, "%s%02x", (i&0xf)? " " : (i? "\n" : ""), data[i]);
+    }
+    fprintf(stdout, "\n");
+}
+
+static int ws_stream_create(struct ws_stream **pstream, struct uri *uri)
+{
+    struct ws_stream *stream;
+
+    stream = calloc(1, sizeof(*stream));
+    if (!stream) {
+        log_errf("ws stream create", "out of memory");
+        *pstream = NULL;
+        return -1;
+    }
+    stream->s.uri = uri;
+    stream->s.id = -1;
+    stream->s.on_close = ws_stream_on_close;
+    stream->s.on_recv_data = ws_stream_on_recv_data;
+    *pstream = stream;
+    return 0;
+}
+
+static ssize_t ws_stream_read_req_body(nghttp2_session *ngh2,
+                                       int32_t stream_id,
+                                       uint8_t *buf, size_t buflen,
+                                       uint32_t *pflags,
+                                       nghttp2_data_source *source,
+                                       void *user_data)
+{
+    struct h2_session *session = user_data;
+    struct ws_stream *stream;
+    ssize_t nread = 0;
+    int eof = 0;
+
+    stream = (struct ws_stream *)h2_session_stream_get(session, stream_id);
+    if (!stream) {
+         log_errf("stream req body", "stream not known");
+        return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+
+    (void)source;
+    assert(stream->s.fdin >= 0);
+    nread = read(stream->s.fdin, buf, buflen);
+    log_debugf("stream req body", "fread(len=%lu) -> %ld",
+               (unsigned long)buflen, (long)nread);
+
+    if (nread < 0) {
+        if (errno == EAGAIN) {
+            nread = 0;
+        }
+        else {
+            log_errf("stream req body", "error on input");
+            return NGHTTP2_ERR_CALLBACK_FAILURE;
+        }
+    }
+    else if (nread == 0) {
+      eof = 1;
+      stream->s.input_closed = 1;
+    }
+
+    *pflags = stream->s.input_closed? NGHTTP2_DATA_FLAG_EOF : 0;
+    if (nread == 0 && !eof) {
+      return NGHTTP2_ERR_DEFERRED;
+    }
+    return nread;
+}
+
+static int ws_stream_submit(struct ws_stream *stream,
+                            struct h2_session *session,
+                            const nghttp2_nv *nva, size_t nvalen,
+                            int fdin)
+{
+    nghttp2_data_provider provider, *req_body = NULL;
+
+    if (fdin >= 0) {
+        sock_nonblock_nodelay(fdin);
+        stream->s.fdin = fdin;
+        provider.read_callback = ws_stream_read_req_body;
+        provider.source.ptr = NULL;
+        req_body = &provider;
+    }
+    else {
+        stream->s.input_closed = 1;
+    }
+
+    stream->s.id = nghttp2_submit_request(session->ngh2, NULL, nva, nvalen,
+                                          req_body, stream);
+    if (stream->s.id < 0) {
+        log_errf("ws stream submit", "nghttp2_submit_request: error %d",
+                 stream->s.id);
+        return -1;
+    }
+
+    h2_session_stream_add(session, &stream->s);
+    log_infof("ws stream submit", "stream %d opened for %s%s",
+              stream->s.id, stream->s.uri->authority, stream->s.uri->path);
+    return 0;
+}
+
+static void usage(const char *msg)
+{
+    if(msg)
+        fprintf(stderr, "%s\n", msg);
+    fprintf(stderr,
+        "usage: [options] ws-uri scenario\n"
+        "  run a websocket scenario to the ws-uri, options:\n"
+        "  -c host:port connect to host:port\n"
+        "  -v         increase verbosity\n"
+        "scenarios are:\n"
+        "  * fail-proto: CONNECT using wrong :protocol\n"
+        "  * miss-authority: CONNECT without :authority header\n"
+        "  * miss-path: CONNECT without :path header\n"
+        "  * miss-scheme: CONNECT without :scheme header\n"
+        "  * miss-version: CONNECT without sec-webSocket-version header\n"
+        "  * ws-empty: open valid websocket, do not send anything\n"
+    );
+}
+
+int main(int argc, char *argv[])
+{
+    const char *host = NULL, *scenario;
+    uint16_t port = 80;
+    struct uri uri;
+    struct h2_session session;
+    struct ws_stream *stream;
+    char ch;
+
+    cmd = argv[0];
+    while((ch = getopt(argc, argv, "c:vh")) != -1) {
+        switch(ch) {
+        case 'c':
+            if (parse_host_port(&host, &port, NULL, NULL,
+                                optarg, strlen(optarg), 80)) {
+                log_errf(cmd, "could not parse connect '%s'", optarg);
+                return 1;
+            }
+            break;
+        case 'h':
+            usage(NULL);
+            return 2;
+            break;
+        case 'v':
+            ++verbose;
+            break;
+        default:
+           usage("invalid option");
+           return 1;
+        }
+    }
+    argc -= optind;
+    argv += optind;
+
+    if (argc < 1) {
+        usage("need URL");
+        return 1;
+    }
+    if (argc < 2) {
+        usage("need scenario");
+        return 1;
+    }
+    if (parse_uri(&uri, argv[0], strlen(argv[0]))) {
+        log_errf(cmd, "could not parse uri '%s'", argv[0]);
+        return 1;
+    }
+    log_debugf(cmd, "normalized uri: %s://%s:%u%s", uri.scheme, uri.host,
+               uri.port, uri.path? uri.path : "");
+    scenario = argv[1];
+
+    if (!host) {
+        host = uri.host;
+        port = uri.port;
+    }
+
+    if (h2_session_open(&session, uri.host, host, port))
+        return 1;
+
+    if (ws_stream_create(&stream, &uri))
+        return 1;
+
+    if (!strcmp(scenario, "ws-stdin")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), 0))
+            return 1;
+    }
+    else if (!strcmp(scenario, "fail-proto")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websockets"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-version")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-path")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-scheme")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":authority", stream->s.uri->authority),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else if (!strcmp(scenario, "miss-authority")) {
+        const nghttp2_nv nva[] = {
+            MAKE_NV(":method", "CONNECT"),
+            MAKE_NV_CS(":path", stream->s.uri->path),
+            MAKE_NV_CS(":scheme", "http"),
+            MAKE_NV_CS(":protocol", "websocket"),
+            MAKE_NV("accept", "*/*"),
+            MAKE_NV("user-agent", "mod_h2/h2ws-test"),
+            MAKE_NV("sec-webSocket-version", "13"),
+            MAKE_NV("sec-webSocket-protocol", "chat"),
+        };
+        if (ws_stream_submit(stream, &session,
+                             nva, sizeof(nva) / sizeof(nva[0]), -1))
+            return 1;
+    }
+    else {
+        log_errf(cmd, "unknown scenario: %s", scenario);
+        return 1;
+    }
+
+    h2_session_run(&session);
+    h2_session_close(&session);
+    return 0;
+}

Added: httpd/httpd/trunk/test/modules/http2/test_800_websockets.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/http2/test_800_websockets.py?rev=1910507&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/http2/test_800_websockets.py (added)
+++ httpd/httpd/trunk/test/modules/http2/test_800_websockets.py Tue Jun 20 12:01:09 2023
@@ -0,0 +1,306 @@
+import inspect
+import logging
+import os
+import shutil
+import subprocess
+import time
+from datetime import timedelta, datetime
+from typing import Tuple, Union, List
+import packaging.version
+
+import pytest
+import websockets
+from pyhttpd.result import ExecResult
+from pyhttpd.ws_util import WsFrameReader, WsFrame
+
+from .env import H2Conf, H2TestEnv
+
+
+log = logging.getLogger(__name__)
+
+ws_version = packaging.version.parse(websockets.version.version)
+ws_version_min = packaging.version.Version('10.4')
+
+
+def ws_run(env: H2TestEnv, path, do_input=None,
+           inbytes=None, send_close=True,
+           timeout=5, scenario='ws-stdin',
+           wait_close: float = 0.0) -> Tuple[ExecResult, List[str], Union[List[WsFrame], bytes]]:
+    """ Run the h2ws test client in various scenarios with given input and
+        timings.
+    :param env: the test environment
+    :param path: the path on the Apache server to CONNECt to
+    :param do_input: a Callable for sending input to h2ws
+    :param inbytes: fixed bytes to send to h2ws, unless do_input is given
+    :param send_close: send a CLOSE WebSockets frame at the end
+    :param timeout: timeout for waiting on h2ws to finish
+    :param scenario: name of scenario h2ws should run in
+    :param wait_close: time to wait before closing input
+    :return: ExecResult with exit_code/stdout/stderr of run
+    """
+    h2ws = os.path.join(env.clients_dir, 'h2ws')
+    if not os.path.exists(h2ws):
+        pytest.fail(f'test client not build: {h2ws}')
+    args = [
+        h2ws, '-vv', '-c', f'localhost:{env.http_port}',
+        f'ws://cgi.{env.http_tld}:{env.http_port}{path}',
+        scenario
+    ]
+    # we write all output to files, because we manipulate input timings
+    # and would run in deadlock situations with h2ws blocking operations
+    # because its output is not consumed
+    with open(f'{env.gen_dir}/h2ws.stdout', 'w') as fdout:
+        with open(f'{env.gen_dir}/h2ws.stderr', 'w') as fderr:
+            proc = subprocess.Popen(args=args, stdin=subprocess.PIPE,
+                                    stdout=fdout, stderr=fderr)
+            if do_input is not None:
+                do_input(proc)
+            elif inbytes is not None:
+                proc.stdin.write(inbytes)
+                proc.stdin.flush()
+
+            if wait_close > 0:
+                time.sleep(wait_close)
+            try:
+                inbytes = WsFrame.client_close(code=1000).to_network() if send_close else None
+                proc.communicate(input=inbytes, timeout=timeout)
+            except subprocess.TimeoutExpired:
+                log.error(f'ws_run: timeout expired')
+                proc.kill()
+                proc.communicate(timeout=timeout)
+    lines = open(f'{env.gen_dir}/h2ws.stdout').read().splitlines()
+    infos = [line for line in lines if line.startswith('[1] ')]
+    hex_content = ' '.join([line for line in lines if not line.startswith('[1] ')])
+    if len(infos) > 0 and infos[0] == '[1] :status: 200':
+        frames = WsFrameReader.parse(bytearray.fromhex(hex_content))
+    else:
+        frames = bytearray.fromhex(hex_content)
+    return ExecResult(args=args, exit_code=proc.returncode,
+                      stdout=b'', stderr=b''), infos, frames
+
+
+@pytest.mark.skipif(condition=H2TestEnv.is_unsupported, reason="mod_http2 not supported here")
+@pytest.mark.skipif(condition=ws_version < ws_version_min,
+                    reason=f'websockets is {ws_version}, need at least {ws_version_min}')
+class TestWebSockets:
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _class_scope(self, env):
+        # Apache config that CONNECT proxies a WebSocket server for paths starting
+        # with '/ws/'
+        # The WebSocket server is started in pytest fixture 'ws_server' below.
+        conf = H2Conf(env, extras={
+            f'cgi.{env.http_tld}': [
+              f'  H2WebSockets on',
+              f'  ProxyPass /ws/ http://127.0.0.1:{env.ws_port}/ \\',
+              f'           upgrade=websocket timeout=10',
+            ]
+        })
+        conf.add_vhost_cgi(proxy_self=True, h2proxy_self=True).install()
+        assert env.apache_restart() == 0
+
+    def ws_check_alive(self, env, timeout=5):
+        url = f'http://localhost:{env.ws_port}/'
+        end = datetime.now() + timedelta(seconds=timeout)
+        while datetime.now() < end:
+            r = env.curl_get(url, 5)
+            if r.exit_code == 0:
+                return True
+            time.sleep(.1)
+        return False
+
+    def _mkpath(self, path):
+        if not os.path.exists(path):
+            return os.makedirs(path)
+
+    def _rmrf(self, path):
+        if os.path.exists(path):
+            return shutil.rmtree(path)
+
+    @pytest.fixture(autouse=True, scope='class')
+    def ws_server(self, env):
+        # Run our python websockets server that has some special behaviour
+        # for the different path to CONNECT to.
+        run_dir = os.path.join(env.gen_dir, 'ws-server')
+        err_file = os.path.join(run_dir, 'stderr')
+        self._rmrf(run_dir)
+        self._mkpath(run_dir)
+        with open(err_file, 'w') as cerr:
+            cmd = os.path.join(os.path.dirname(inspect.getfile(TestWebSockets)),
+                               'ws_server.py')
+            args = ['python3', cmd, '--port', str(env.ws_port)]
+            p = subprocess.Popen(args=args, cwd=run_dir, stderr=cerr,
+                                 stdout=cerr)
+            if not self.ws_check_alive(env):
+                p.kill()
+                p.wait()
+                pytest.fail(f'ws_server did not start. stderr={open(err_file).readlines()}')
+            yield
+            p.terminate()
+
+    # a correct CONNECT, send CLOSE, expect CLOSE, basic success
+    def test_h2_800_01_ws_empty(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 1, f'{frames}'
+        assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT with invalid :protocol header, must fail
+    def test_h2_800_02_fail_proto(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='fail-proto')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+    # CONNECT to a URL path that does not exist on the server
+    def test_h2_800_03_not_found(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/does-not-exist')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 404', '[1] EOF'], f'{r}'
+
+    # CONNECT to a URL path that is a normal HTTP file resource
+    # we do not want to receive the body of that
+    def test_h2_800_04_non_ws_resource(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/alive.json')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+        assert frames == b''
+
+    # CONNECT to a URL path that sends a delayed HTTP response body
+    # we do not want to receive the body of that
+    def test_h2_800_05_non_ws_delay_resource(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/h2test/error?body_delay=100ms')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 502', '[1] EOF'], f'{r}'
+        assert frames == b''
+
+    # CONNECT missing the sec-webSocket-version header
+    def test_h2_800_06_miss_version(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-version')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 400', '[1] EOF'], f'{r}'
+
+    # CONNECT missing the :path header
+    def test_h2_800_07_miss_path(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-path')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT missing the :scheme header
+    def test_h2_800_08_miss_scheme(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-scheme')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT missing the :authority header
+    def test_h2_800_09_miss_authority(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/echo/', scenario='miss-authority')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] RST'], f'{r}'
+
+    # CONNECT and exchange a PING
+    def test_h2_800_10_ws_ping(self, env: H2TestEnv, ws_server):
+        ping = WsFrame.client_ping(b'12345')
+        r, infos, frames = ws_run(env, path='/ws/echo/', inbytes=ping.to_network())
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 2, f'{frames}'
+        assert frames[0].opcode == WsFrame.PONG, f'{frames}'
+        assert frames[0].data == ping.data, f'{frames}'
+        assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT and send several PINGs with a delay of 200ms
+    def test_h2_800_11_ws_timed_pings(self, env: H2TestEnv, ws_server):
+        frame_count = 5
+        ping = WsFrame.client_ping(b'12345')
+
+        def do_send(proc):
+            for _ in range(frame_count):
+                try:
+                    proc.stdin.write(ping.to_network())
+                    proc.stdin.flush()
+                    proc.wait(timeout=0.2)
+                except subprocess.TimeoutExpired:
+                    pass
+
+        r, infos, frames = ws_run(env, path='/ws/echo/', do_input=do_send)
+        assert r.exit_code == 0
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == frame_count + 1, f'{frames}'
+        assert frames[-1].opcode == WsFrame.CLOSE, f'{frames}'
+        for i in range(frame_count):
+            assert frames[i].opcode == WsFrame.PONG, f'{frames}'
+            assert frames[i].data == ping.data, f'{frames}'
+
+    # CONNECT to path that closes immediately
+    def test_h2_800_12_ws_unknown(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/unknown')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 1, f'{frames}'
+        # expect a CLOSE with code=4999, reason='path unknown'
+        assert frames[0].opcode == WsFrame.CLOSE, f'{frames}'
+        assert frames[0].data[2:].decode() == 'path unknown', f'{frames}'
+
+    # CONNECT to a path that sends us 1 TEXT frame
+    def test_h2_800_13_ws_text(self, env: H2TestEnv, ws_server):
+        r, infos, frames = ws_run(env, path='/ws/text/')
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) == 2, f'{frames}'
+        assert frames[0].opcode == WsFrame.TEXT, f'{frames}'
+        assert frames[0].data.decode() == 'hello!', f'{frames}'
+        assert frames[1].opcode == WsFrame.CLOSE, f'{frames}'
+
+    # CONNECT to a path that sends us a named file in BINARY frames
+    @pytest.mark.parametrize("fname,flen", [
+        ("data-1k", 1000),
+        ("data-10k", 10000),
+        ("data-100k", 100*1000),
+        ("data-1m", 1000*1000),
+    ])
+    def test_h2_800_14_ws_file(self, env: H2TestEnv, ws_server, fname, flen):
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}', wait_close=0.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}'
+
+    # CONNECT to path with 1MB file and trigger varying BINARY frame lengths
+    @pytest.mark.parametrize("frame_len", [
+        1000 * 1024,
+        100 * 1024,
+        10 * 1024,
+        1 * 1024,
+        512,
+    ])
+    def test_h2_800_15_ws_frame_len(self, env: H2TestEnv, ws_server, frame_len):
+        fname = "data-1m"
+        flen = 1000*1000
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}', wait_close=0.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}'
+
+    # CONNECT to path with 1MB file and trigger delays between BINARY frame writes
+    @pytest.mark.parametrize("frame_delay", [
+        1,
+        10,
+        50,
+        100,
+    ])
+    def test_h2_800_16_ws_frame_delay(self, env: H2TestEnv, ws_server, frame_delay):
+        fname = "data-1m"
+        flen = 1000*1000
+        # adjust frame_len to allow for 1 second overall duration
+        frame_len = int(flen / (1000 / frame_delay))
+        r, infos, frames = ws_run(env, path=f'/ws/file/{fname}/{frame_len}/{frame_delay}',
+                                  wait_close=1.5)
+        assert r.exit_code == 0, f'{r}'
+        assert infos == ['[1] :status: 200', '[1] EOF'], f'{r}'
+        assert len(frames) > 0
+        total_len = sum([f.data_len for f in frames if f.opcode == WsFrame.BINARY])
+        assert total_len == flen, f'{frames}\n{r}'

Added: httpd/httpd/trunk/test/modules/http2/ws_server.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/http2/ws_server.py?rev=1910507&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/http2/ws_server.py (added)
+++ httpd/httpd/trunk/test/modules/http2/ws_server.py Tue Jun 20 12:01:09 2023
@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+import argparse
+import asyncio
+import logging
+import os
+import sys
+import time
+
+import websockets.server as ws_server
+from websockets.exceptions import ConnectionClosedError
+
+log = logging.getLogger(__name__)
+
+logging.basicConfig(
+    format="[%(asctime)s] %(message)s",
+    level=logging.DEBUG,
+)
+
+
+async def echo(websocket):
+    try:
+        async for message in websocket:
+            try:
+                log.info(f'got request {message}')
+            except Exception as e:
+                log.error(f'error {e} getting path from {message}')
+            await websocket.send(message)
+    except ConnectionClosedError:
+        pass
+
+
+async def on_async_conn(conn):
+    rpath = str(conn.path)
+    pcomps = rpath[1:].split('/')
+    if len(pcomps) == 0:
+        pcomps = ['echo']  # default handler
+    log.info(f'connection for {pcomps}')
+    if pcomps[0] == 'echo':
+        log.info(f'/echo endpoint')
+        for message in await conn.recv():
+            await conn.send(message)
+    elif pcomps[0] == 'text':
+        await conn.send('hello!')
+    elif pcomps[0] == 'file':
+        if len(pcomps) < 2:
+            conn.close(code=4999, reason='unknown file')
+            return
+        fpath = os.path.join('../', pcomps[1])
+        if not os.path.exists(fpath):
+            conn.close(code=4999, reason='file not found')
+            return
+        bufsize = 0
+        if len(pcomps) > 2:
+            bufsize = int(pcomps[2])
+        if bufsize <= 0:
+            bufsize = 16*1024
+        delay_ms = 0
+        if len(pcomps) > 3:
+            delay_ms = int(pcomps[3])
+        with open(fpath, 'r+b') as fd:
+            while True:
+                buf = fd.read(bufsize)
+                if buf is None or len(buf) == 0:
+                    break
+                await conn.send(buf)
+                if delay_ms > 0:
+                    time.sleep(delay_ms/1000)
+    else:
+        log.info(f'unknown endpoint: {rpath}')
+        await conn.close(code=4999, reason='path unknown')
+    await conn.close(code=1000, reason='')
+
+
+async def run_server(port):
+    log.info(f'starting server on port {port}')
+    async with ws_server.serve(ws_handler=on_async_conn,
+                               host="localhost", port=port):
+        await asyncio.Future()
+
+
+async def main():
+    parser = argparse.ArgumentParser(prog='scorecard',
+                                     description="Run a websocket echo server.")
+    parser.add_argument("--port", type=int,
+                        default=0, help="port to listen on")
+    args = parser.parse_args()
+
+    if args.port == 0:
+        sys.stderr.write('need --port\n')
+        sys.exit(1)
+
+    logging.basicConfig(
+        format="%(asctime)s %(message)s",
+        level=logging.DEBUG,
+    )
+    await run_server(args.port)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

Modified: httpd/httpd/trunk/test/pyhttpd/config.ini.in
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/pyhttpd/config.ini.in?rev=1910507&r1=1910506&r2=1910507&view=diff
==============================================================================
--- httpd/httpd/trunk/test/pyhttpd/config.ini.in (original)
+++ httpd/httpd/trunk/test/pyhttpd/config.ini.in Tue Jun 20 12:01:09 2023
@@ -26,6 +26,7 @@ http_port = 5002
 https_port = 5001
 proxy_port = 5003
 http_port2 = 5004
+ws_port = 5100
 http_tld = tests.httpd.apache.org
 test_dir = @abs_srcdir@
 test_src_dir = @abs_srcdir@

Modified: httpd/httpd/trunk/test/pyhttpd/env.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/pyhttpd/env.py?rev=1910507&r1=1910506&r2=1910507&view=diff
==============================================================================
--- httpd/httpd/trunk/test/pyhttpd/env.py (original)
+++ httpd/httpd/trunk/test/pyhttpd/env.py Tue Jun 20 12:01:09 2023
@@ -250,8 +250,10 @@ class HttpdTestEnv:
         self._http_port2 = int(self.config.get('test', 'http_port2'))
         self._https_port = int(self.config.get('test', 'https_port'))
         self._proxy_port = int(self.config.get('test', 'proxy_port'))
+        self._ws_port = int(self.config.get('test', 'ws_port'))
         self._http_tld = self.config.get('test', 'http_tld')
         self._test_dir = self.config.get('test', 'test_dir')
+        self._clients_dir = os.path.join(os.path.dirname(self._test_dir), 'clients')
         self._gen_dir = self.config.get('test', 'gen_dir')
         self._server_dir = os.path.join(self._gen_dir, 'apache')
         self._server_conf_dir = os.path.join(self._server_dir, "conf")
@@ -367,6 +369,10 @@ class HttpdTestEnv:
         return self._proxy_port
 
     @property
+    def ws_port(self) -> int:
+        return self._ws_port
+
+    @property
     def http_tld(self) -> str:
         return self._http_tld
 
@@ -391,6 +397,10 @@ class HttpdTestEnv:
         return self._test_dir
 
     @property
+    def clients_dir(self) -> str:
+        return self._clients_dir
+
+    @property
     def server_dir(self) -> str:
         return self._server_dir
 
@@ -519,12 +529,14 @@ class HttpdTestEnv:
         if not os.path.exists(path):
             return os.makedirs(path)
 
-    def run(self, args, stdout_list=False, intext=None, debug_log=True):
+    def run(self, args, stdout_list=False, intext=None, inbytes=None, debug_log=True):
         if debug_log:
             log.debug(f"run: {args}")
         start = datetime.now()
+        if intext is not None:
+            inbytes = intext.encode()
         p = subprocess.run(args, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
-                           input=intext.encode() if intext else None)
+                           input=inbytes)
         stdout_as_list = None
         if stdout_list:
             try:

Added: httpd/httpd/trunk/test/pyhttpd/ws_util.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/pyhttpd/ws_util.py?rev=1910507&view=auto
==============================================================================
--- httpd/httpd/trunk/test/pyhttpd/ws_util.py (added)
+++ httpd/httpd/trunk/test/pyhttpd/ws_util.py Tue Jun 20 12:01:09 2023
@@ -0,0 +1,137 @@
+import logging
+import struct
+
+
+log = logging.getLogger(__name__)
+
+
+class WsFrame:
+
+    CONT = 0
+    TEXT = 1
+    BINARY = 2
+    RSVD3 = 3
+    RSVD4 = 4
+    RSVD5 = 5
+    RSVD6 = 6
+    RSVD7 = 7
+    CLOSE = 8
+    PING = 9
+    PONG = 10
+    RSVD11 = 11
+    RSVD12 = 12
+    RSVD13 = 13
+    RSVD14 = 14
+    RSVD15 = 15
+
+    OP_NAMES = [
+        "CONT",
+        "TEXT",
+        "BINARY",
+        "RSVD3",
+        "RSVD4",
+        "RSVD5",
+        "RSVD6",
+        "RSVD7",
+        "CLOSE",
+        "PING",
+        "PONG",
+        "RSVD11",
+        "RSVD12",
+        "RSVD13",
+        "RSVD14",
+        "RSVD15",
+    ]
+
+    def __init__(self, opcode: int, fin: bool, mask: bytes, data: bytes):
+        self.opcode = opcode
+        self.fin = fin
+        self.mask = mask
+        self.data = data
+        self.length = len(data)
+
+    def __repr__(self):
+        return f'WsFrame[{self.OP_NAMES[self.opcode]} fin={self.fin}, mask={self.mask}, len={len(self.data)}]'
+
+    @property
+    def data_len(self) -> int:
+        return len(self.data) if self.data else 0
+
+    def to_network(self) -> bytes:
+        nd = bytearray()
+        h1 = self.opcode
+        if self.fin:
+            h1 |= 0x80
+        nd.extend(struct.pack("!B", h1))
+        mask_bit = 0x80 if self.mask is not None else 0x0
+        h2 = self.data_len
+        if h2 > 65535:
+            nd.extend(struct.pack("!BQ", 127|mask_bit, h2))
+        elif h2 > 126:
+            nd.extend(struct.pack("!BH", 126|mask_bit, h2))
+        else:
+            nd.extend(struct.pack("!B", h2|mask_bit))
+        if self.mask is not None:
+            nd.extend(self.mask)
+        if self.data is not None:
+            nd.extend(self.data)
+        return nd
+
+    @classmethod
+    def client_ping(cls, data: bytes, mask: bytes = None) -> 'WsFrame':
+        if mask is None:
+            mask = bytes.fromhex('00 00 00 00')
+        return WsFrame(opcode=WsFrame.PING, fin=True, mask=mask, data=data)
+
+    @classmethod
+    def client_close(cls, code: int, reason: str = None,
+                     mask: bytes = None) -> 'WsFrame':
+        data = bytearray(struct.pack("!H", code))
+        if reason is not None:
+            data.extend(reason.encode())
+        if mask is None:
+            mask = bytes.fromhex('00 00 00 00')
+        return WsFrame(opcode=WsFrame.CLOSE, fin=True, mask=mask, data=data)
+
+
+class WsFrameReader:
+
+    def __init__(self, data: bytes):
+        self.data = data
+
+    def _read(self, n: int):
+        if len(self.data) < n:
+            raise EOFError(f'have {len(self.data)} bytes left, but {n} requested')
+        elif n == 0:
+            return b''
+        chunk = self.data[:n]
+        del self.data[:n]
+        return chunk
+
+    def next_frame(self):
+        data = self._read(2)
+        h1, h2 = struct.unpack("!BB", data)
+        log.debug(f'parsed h1={h1} h2={h2} from {data}')
+        fin = True if h1 & 0x80 else False
+        opcode = h1 & 0xf
+        has_mask = True if h2 & 0x80 else False
+        mask = None
+        dlen = h2 & 0x7f
+        if dlen == 126:
+            (dlen,) = struct.unpack("!H", self._read(2))
+        elif dlen == 127:
+            (dlen,) = struct.unpack("!Q", self._read(8))
+        if has_mask:
+            mask = self._read(4)
+        return WsFrame(opcode=opcode, fin=fin, mask=mask, data=self._read(dlen))
+
+    def eof(self):
+        return len(self.data) == 0
+
+    @classmethod
+    def parse(cls, data: bytes):
+        frames = []
+        reader = WsFrameReader(data=data)
+        while not reader.eof():
+            frames.append(reader.next_frame())
+        return frames

Modified: httpd/httpd/trunk/test/travis_run_linux.sh
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/travis_run_linux.sh?rev=1910507&r1=1910506&r2=1910507&view=diff
==============================================================================
--- httpd/httpd/trunk/test/travis_run_linux.sh (original)
+++ httpd/httpd/trunk/test/travis_run_linux.sh Tue Jun 20 12:01:09 2023
@@ -221,6 +221,8 @@ if ! test -v SKIP_TESTING; then
     fi
 
     if test -v TEST_H2 -a $RV -eq 0; then
+        # Build the test clients
+        (cd test/clients && make)
         # Run HTTP/2 tests.
         MPM=event py.test-3 test/modules/http2
         RV=$?