You are viewing a plain text version of this content. The canonical link for it is here.
Posted to modules-dev@httpd.apache.org by Jacob Champion <ja...@ni.com> on 2015/02/25 01:16:51 UTC

Debugging mod_websocket -- any others out there?

Hi everyone,

[I'm keeping this short in case it's the wrong list for this topic -- if
you think I should go to apache-dev or apache-users instead, just say
the word.]

First some background: I'm interested in running a WebSocket endpoint,
in production, from inside Apache 2.4. I'm aware that there are
architectural issues with long-lived TCP connections hogging Apache
resources, and at the moment I'm prepared to live with the consequences
of that (or at least I think I am).

I've been working with mod_websocket [1] for a few weeks now. Things
were going very well until we ran into crashes under heavy load. I
_think_ they are related to the threading model used by mod_websocket --
it spins up a new thread and a new bucket brigade to write to the
connection, while the original thread and brigade block to read
messages. Unfortunately I don't know enough about the brigade system to
know whether that's kosher or even fixable.

The original author appears to have moved on to other things, if the
Github history and open pull requests are any indication. So my initial
questions for you are:

- Is there enough interest in mod_websocket for my debugging it to be
worth anyone else's time? It seems that there was some initial interest
in folding mod_websocket back into ASF, but that died at some point.

- Does anyone know of a community of mod_websocket users that I'm
missing? Contributing fixes back to whomever is using it would be ideal.

- Is it even possible/safe to do an asynchronous "one thread reads, one
thread writes" operation in a production-quality module? (In other
words, is mod_websocket just unfixably broken?)

Basically I'm hoping to get some idea of whether I'm wasting my time. I
believe the primary alternative would be to use a second WebSocket-savvy
server on the backend, and run communication to it through
mod_proxy_wstunnel.

Thanks for your input!

Jacob Champion
LabVIEW R&D
National Instruments

[1] https://github.com/disconnect/apache-websocket

Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Alex Bligh <al...@alex.org.uk>.
On 30 Apr 2015, at 19:19, Jacob Champion <ja...@ni.com> wrote:

> Quick bump to this thread. Have any interested parties (Alex?) been able
> to give these patches a try?

Not yet - sorry.

-- 
Alex Bligh





Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Jacob Champion <ja...@ni.com>.
On 3/9/2015 1:43 PM, Jacob Champion wrote:
>  mod_websocket.c | 1005 +++++++++++++++++++++++++++++++++----------------------
>  1 file changed, 607 insertions(+), 398 deletions(-)

Quick bump to this thread. Have any interested parties (Alex?) been able
to give these patches a try? Any comments on coding style, architecture,
etc.?

And now for a really ambitious question: with enough cleanup and TLC,
could this module ever be considered for support as an official module?
If so, what would we need to put into it to get it there? If not, what
are the objections?

I'm particularly curious about thoughts on the threading architecture
(one thread monopolized per WebSocket), the authorship concern (IIUC,
ASF requires some trademark and/or other IP transfer, which may not be
possible if the author is gone), and just the general question of
"should Apache support WebSockets out of the box?" Does the answer to
the last question change depending on the version of Apache (2.2, 2.4, 2.6)?

Thanks!

Jacob Champion
LabVIEW R&D
National Instruments

Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 11 Sep 2015, at 23:49, Jacob Champion <ch...@gmail.com> wrote:

> Hi Alex,
> 
> On 09/11/2015 10:02 AM, Alex Bligh wrote:
>> mod_proxy_wstunnel forwards the websocket connection without
>> interpreting the protocol (i.e. needs to be directed at a websocket
>> server); my module (which just plugs into mod_websocket) forwards it
>> as a TCP port. EG for VNC over Websockets you'd just need to point my
>> module at port 5900, whereas with mod_websocket you'd need something
>> further to decode it.
> 
> Oh, neat! I assume there are specific protocol designs that can't be proxied over WebSocket? (For example, anything that requires the application-level use of a TCP half-close would presumably be iffy?) What are the sorts of protocols that work best with this setup?

There are some that cannot be proxied directly, but not many due to the prevalence of UNIX 'everything is a file' paradigm.

One such example is the guacamole protocol (a bit like VNC) which requires that a guacamole frame travelling toward the client is not split across websockets frames. You will find in the example I sent a processor for that which interprets client-bound data and splits them up into specific frames rather than treating them as a stream.

I've not come across any issues with half close, but I suppose that might be an issue.

> At this point I want to avoid biting off more than I can chew -- I want to get the current code in order before absorbing more functionality  -- but this is a really neat idea. I'll need to think about security corner cases with a generic TCP proxy setup too... more research needed.

Sure - I was more thinking to add it as an example. As I say, it's a *user* of your module (well, disconnect's module), not a code change to it. There's probably piles of stuff you can strip out.

Oh, and allegedly it doesn't compile on Windows (I wouldn't be able to test that).

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
Hi Alex,

On 09/11/2015 10:02 AM, Alex Bligh wrote:
> mod_proxy_wstunnel forwards the websocket connection without
> interpreting the protocol (i.e. needs to be directed at a websocket
> server); my module (which just plugs into mod_websocket) forwards it
> as a TCP port. EG for VNC over Websockets you'd just need to point my
> module at port 5900, whereas with mod_websocket you'd need something
> further to decode it.

Oh, neat! I assume there are specific protocol designs that can't be 
proxied over WebSocket? (For example, anything that requires the 
application-level use of a TCP half-close would presumably be iffy?) 
What are the sorts of protocols that work best with this setup?

At this point I want to avoid biting off more than I can chew -- I want 
to get the current code in order before absorbing more functionality  -- 
but this is a really neat idea. I'll need to think about security corner 
cases with a generic TCP proxy setup too... more research needed.

>> Hopefully that makes sense -- any specific parts that still need
>> clarification?
>
> Thanks. Yes that makes sense I think. I sort of more meant that it
> would be useful in a README or something in the module itself, as I
> had difficulty grasping it when I was writing the vncproxy stuff.

Gotcha. My latest commit adds a few explanatory comments to the two 
areas I mentioned; thanks for the feedback!

--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 10 Sep 2015, at 23:26, Jacob Champion <ch...@gmail.com> wrote:

> On 09/10/2015 02:50 PM, Alex Bligh wrote:
>> Here: https://github.com/abligh/apache-websocket
>> 
>> in the vncproxy directory you will find a vncproxy (which is actually
>> a generic tcpproxy as well, though it could be hugely simplified to
>> do that alone) which you are welcome to have if you've not changed
>> the API to the websocket modules.
> 
> I'll take a look, thanks. Do you know how your implementation relates to mod_proxy_wstunnel?

mod_proxy_wstunnel forwards the websocket connection without interpreting the protocol (i.e. needs to be directed at a websocket server); my module (which just plugs into mod_websocket) forwards it as a TCP port. EG for VNC over Websockets you'd just need to point my module at port 5900, whereas with mod_websocket you'd need something further to decode it.

(FWIW I switched from using mod_websocket + my code to mod_proxy_wstunnel and some C with libwebsockets at least in part because I couldn't fix what you seem to have just fixed)

>> It would be really helpful if you could document how the threading /
>> FD processing / bucket brigade runs now. Crucially you've put all the
>> bucket brigade handling into a single thread (I think) which I take
>> it is the same thread as apache itself uses - that's what I reckoned
>> I needed to do to get it to work reliably with SSL.
> 
> Right, the single thread is the key. The Git commit messages might help with the understanding somewhat, since that's where I put the patch motivations (and it's what I'm reading to remember what I did, heh).
> 
> The main loop in mod_websocket_data_framing() is where the real work is done. This is called on the original request thread from Apache. The loop alternately checks for incoming data (with a nonblocking read) and outgoing data (from any messages on the outgoing queue). Reading and writing to socket is never done by anything but the framing loop; mod_websocket_plugin_send() queues any cross-thread messages for later processing by the loop. This is similar to a UI event dispatcher, if you've ever worked with a framework that used one.
> 
> The only "tricky" part in the architecture is that we don't want to busy-wait if we have nothing to do, so there's an apr_pollset_t that is waited on when we run out of work.
> 
> Hopefully that makes sense -- any specific parts that still need clarification?

Thanks. Yes that makes sense I think. I sort of more meant that it would be useful in a README or something in the module itself, as I had difficulty grasping it when I was writing the vncproxy stuff.

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
On 09/10/2015 02:50 PM, Alex Bligh wrote:
> Here: https://github.com/abligh/apache-websocket
>
> in the vncproxy directory you will find a vncproxy (which is actually
> a generic tcpproxy as well, though it could be hugely simplified to
> do that alone) which you are welcome to have if you've not changed
> the API to the websocket modules.

I'll take a look, thanks. Do you know how your implementation relates to 
mod_proxy_wstunnel?

> It would be really helpful if you could document how the threading /
> FD processing / bucket brigade runs now. Crucially you've put all the
> bucket brigade handling into a single thread (I think) which I take
> it is the same thread as apache itself uses - that's what I reckoned
> I needed to do to get it to work reliably with SSL.

Right, the single thread is the key. The Git commit messages might help 
with the understanding somewhat, since that's where I put the patch 
motivations (and it's what I'm reading to remember what I did, heh).

The main loop in mod_websocket_data_framing() is where the real work is 
done. This is called on the original request thread from Apache. The 
loop alternately checks for incoming data (with a nonblocking read) and 
outgoing data (from any messages on the outgoing queue). Reading and 
writing to socket is never done by anything but the framing loop; 
mod_websocket_plugin_send() queues any cross-thread messages for later 
processing by the loop. This is similar to a UI event dispatcher, if 
you've ever worked with a framework that used one.

The only "tricky" part in the architecture is that we don't want to 
busy-wait if we have nothing to do, so there's an apr_pollset_t that is 
waited on when we run out of work.

Hopefully that makes sense -- any specific parts that still need 
clarification?

--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 10 Sep 2015, at 22:05, Jacob Champion <ch...@gmail.com> wrote:

> I've submitted a pull request to the original author, but I'm assuming he will not suddenly appear out of the blue after three years, so I plan to actively own and maintain my fork on GitHub. If you're also interested in WebSockets with Apache, and/or you've submitted a pull request to the original project and it hasn't gone anywhere, please give me a buzz.

Interested, but have very limited time.

Thanks for this.

Here:
  https://github.com/abligh/apache-websocket

in the vncproxy directory you will find a vncproxy (which is actually a generic tcpproxy as well, though it could be hugely simplified to do that alone) which you are welcome to have if you've not changed the API to the websocket modules.

It would be really helpful if you could document how the threading / FD processing / bucket brigade runs now. Crucially you've put all the bucket brigade handling into a single thread (I think) which I take it is the same thread as apache itself uses - that's what I reckoned I needed to do to get it to work
reliably with SSL.

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
On 09/11/2015 02:52 AM, Stefan Eissing wrote:
> Hi Jacob,
>
> good to see that someone adopts this! As to advise...
>
> While I am no expert on WebSockets and its different flavors, I know
> that mod_websocket and my cuddly mod_h2 have several things in
> common. For example they both have to manage the "Upgrade:" dance.
> And WebSocket might want to participate in the TLS+ALPN protocol
> handling also.

Hello Stefan! I saw the Upgrade/ALPN discussion a little while ago on 
the dev list and thought of this as well.

> For these things, there are some new features in trunk and hopefully
> in the next 2.4.x release. In case you're interested, we can chat
> about this a bit.

I am absolutely interested. I'm not quite sure where to begin though... 
perhaps migrating the module to use an "official" Upgrade 
implementation? Did you have some entry points or pieces of code in 
mod_h2 or trunk that you'd like me to take a look at?

> As to other topics, such as moving buckets across threads or
> not-busy-waiting: these areas mod_websocket and mod_h2 have in common
> to find a good solution for. Might also be worth an exchange.

If you haven't already, take a look at the initial discussion on 
modules-dev that prompted my patchset -- in particular, the problem of 
how to implement bidirectional asynchronous communication using only one 
thread. The current solution seems to work, but it's less than ideal... 
personally, I'd like it very much if an idle WebSocket connection didn't 
monopolize one of Apache's precious threads. Perhaps mod_h2 has similar 
problems?

I will start taking a look at mod_h2 and familiarizing myself with it; 
I've been meaning to do that for a while now anyway. If you had any 
immediate discussion topics, please by all means bring them up. :) In 
the meantime, my initial focus for this module (besides getting up to 
speed on the codebase itself) will be on RFC compliance...

Thanks!
--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Stefan Eissing <st...@eissing.org>.
Hi Jacob,

good to see that someone adopts this! As to advise...

While I am no expert on WebSockets and its different flavors, I know that mod_websocket and my cuddly mod_h2 have several things in common. For example they both have to manage the "Upgrade:" dance. And WebSocket might want to participate in the TLS+ALPN protocol handling also.

For these things, there are some new features in trunk and hopefully in the next 2.4.x release. In case you're interested, we can chat about this a bit.

As to other topics, such as moving buckets across threads or not-busy-waiting: these areas mod_websocket and mod_h2 have in common to find a good solution for. Might also be worth an exchange.

I was (for no particular reason) not subscribed to modules-dev, but am now. So you'll also find me there.

cheers,

  Stefan


> Am 10.09.2015 um 23:05 schrieb Jacob Champion <ch...@gmail.com>:
> 
> To hopefully put this thing to bed, six months later -- I have taken the mod_websocket patchset started by the long conversation in [1] and pushed it to a GitHub fork [2].
> 
> I've submitted a pull request to the original author, but I'm assuming he will not suddenly appear out of the blue after three years, so I plan to actively own and maintain my fork on GitHub. If you're also interested in WebSockets with Apache, and/or you've submitted a pull request to the original project and it hasn't gone anywhere, please give me a buzz.
> 
> I still need to familiarize myself with the code base and get it into a state where I can sustainably maintain it (tests would be good). After that I'm primarily interested in picking up the conversation left off in Bug 47485 [3]. Any general advice is welcome!
> 
> [Administrative side note: I am no longer employed at National Instruments, so emails to my prior @ni.com address will bounce. Use GitHub to talk to me about the module.]
> 
> Thanks again to modules-dev for helping and humoring me!
> 
> --Jacob
> 
> [1] http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54ED1473.1060604%40ni.com%3E
> [2] https://github.com/jchampio/apache-websocket/tree/dev/parallel-tls-fix
> [3] https://bz.apache.org/bugzilla/show_bug.cgi?id=47485


mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
To hopefully put this thing to bed, six months later -- I have taken the 
mod_websocket patchset started by the long conversation in [1] and 
pushed it to a GitHub fork [2].

I've submitted a pull request to the original author, but I'm assuming 
he will not suddenly appear out of the blue after three years, so I plan 
to actively own and maintain my fork on GitHub. If you're also 
interested in WebSockets with Apache, and/or you've submitted a pull 
request to the original project and it hasn't gone anywhere, please give 
me a buzz.

I still need to familiarize myself with the code base and get it into a 
state where I can sustainably maintain it (tests would be good). After 
that I'm primarily interested in picking up the conversation left off in 
Bug 47485 [3]. Any general advice is welcome!

[Administrative side note: I am no longer employed at National 
Instruments, so emails to my prior @ni.com address will bounce. Use 
GitHub to talk to me about the module.]

Thanks again to modules-dev for helping and humoring me!

--Jacob

[1] 
http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54ED1473.1060604%40ni.com%3E
[2] https://github.com/jchampio/apache-websocket/tree/dev/parallel-tls-fix
[3] https://bz.apache.org/bugzilla/show_bug.cgi?id=47485

Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Jacob Champion <ja...@ni.com>.
Alex,

On 3/9/2015 2:11 PM, Alex Bligh wrote:

> Note mod_websocket is not (currently) in the list of apache supported
> modules, though for one would love it to be. So for most people they
> can't see what you are patching against.

I didn't even think about that -- for future readers of this list, this
patchset is based on commit cfaef071 of

    https://github.com/disconnect/apache-websocket/

> Do you have this in git somewhere (e.g. github?)

I don't currently. I used to have a github account for my company; if it
makes things much easier for you, I can look into having it reactivated.

Thanks again for your input on this fix!

Jacob Champion
LabVIEW R&D
National Instruments

Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Alex Bligh <al...@alex.org.uk>.
Jacob,

On 9 Mar 2015, at 18:43, Jacob Champion <ja...@ni.com> wrote:

> The first patch changes to nonblocking reads and the use of
> apr_pollset_poll(). The next two patches are meant to be mostly
> refactoring and code motion, in order to make the fourth patch (which
> contains the actual architecture changes) more straightforward. The
> fifth patch is cleanup of now unnecessary code.
> 
> Comments and suggestions are welcome!
> 
> The code in this patchset is Copyright (c) National Instruments and
> Apache Software Foundation, licensed to the public under the Apache
> License 2.0.

Thanks for these. I'll try to find some time to look through them.
Note mod_websocket is not (currently) in the list of apache supported
modules, though for one would love it to be. So for most people they
can't see what you are patching against.

Do you have this in git somewhere (e.g. github?)

-- 
Alex Bligh





[PATCH 1/5] Change reads to nonblocking/poll

Posted by Jacob Champion <ja...@ni.com>.
In preparation for an architecture where reads and writes to the brigade
happen on the same thread, perform nonblocking reads and poll on the
socket to determine when more data is available.
---
 mod_websocket.c | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index b5e73be..5a04936 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -381,26 +381,22 @@ static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
 /*
  * Read a buffer of data from the input stream.
  */
-static apr_size_t mod_websocket_read_block(request_rec *r, char *buffer,
-                                           apr_size_t bufsiz)
+static apr_status_t mod_websocket_read_nonblock(request_rec *r, char *buffer,
+                                                apr_size_t *bufsiz)
 {
-    apr_status_t rv;
+    apr_status_t rv = APR_ENOMEM;
     apr_bucket_brigade *bb;
-    apr_size_t readbufsiz = 0;
 
     bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
     if (bb != NULL) {
         if ((rv =
              ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
-                            APR_BLOCK_READ, bufsiz)) == APR_SUCCESS) {
-            if ((rv =
-                 apr_brigade_flatten(bb, buffer, &bufsiz)) == APR_SUCCESS) {
-                readbufsiz = bufsiz;
-            }
+                            APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
+            rv = apr_brigade_flatten(bb, buffer, bufsiz);
         }
         apr_brigade_destroy(bb);
     }
-    return readbufsiz;
+    return rv;
 }
 
 /*
@@ -475,6 +471,10 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     apr_pool_t *pool = NULL;
     apr_bucket_alloc_t *bucket_alloc;
     apr_bucket_brigade *obb;
+    apr_pollset_t *pollset;
+    apr_pollfd_t pollfd = { 0 };
+    const apr_pollfd_t *signalled;
+    apr_int32_t pollcnt;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection (r->connection->bucket_alloc)
@@ -487,7 +487,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
-        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL)) {
+        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
+        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         apr_int64_t extension_bytes_remaining = 0;
@@ -504,15 +505,30 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         unsigned short status_code = STATUS_CODE_OK;
         unsigned char status_code_buffer[2];
 
+        /* Initialize the pollset */
+        pollfd.p = pool;
+        pollfd.desc_type = APR_POLL_SOCKET;
+        pollfd.reqevents = APR_POLLIN;
+        pollfd.desc.s = ap_get_conn_socket(state->r->connection);
+        apr_pollset_add(pollset, &pollfd);
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
-        while ((framing_state != DATA_FRAMING_CLOSE) &&
-               ((block_size =
-                 mod_websocket_read_block(r, (char *)block,
-                                          sizeof(block))) > 0)) {
+        while ((framing_state != DATA_FRAMING_CLOSE)) {
             apr_int64_t block_offset = 0;
+            apr_status_t rv;
+
+            do {
+                block_size = sizeof(block);
+                rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+            } while (APR_STATUS_IS_EAGAIN(rv) &&
+                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == APR_SUCCESS);
+
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
             while (block_offset < block_size) {
                 switch (framing_state) {
-- 
2.1.1


[PATCH 3/5] Pull framing logic into a separate function

Posted by Jacob Champion <ja...@ni.com>.
mod_websocket_handle_incoming now performs the core logic for
mod_websocket_data_framing. This will let us turn the processing loop
into a combination read/write loop more easily.

The state variables have all been moved into a separate
WebSocketReadState struct.
---
 mod_websocket.c | 652 +++++++++++++++++++++++++++++---------------------------
 1 file changed, 342 insertions(+), 310 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 62be765..1e88ef6 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -476,6 +476,327 @@ typedef struct _WebSocketFrameData
     unsigned int utf8_state;
 } WebSocketFrameData;
 
+/* Variables that need to persist across calls to mod_websocket_handle_incoming */
+typedef struct
+{
+    int framing_state;
+    unsigned short status_code;
+    /* XXX fin and opcode appear to be duplicated with frame; can they be removed? */
+    unsigned char fin;
+    unsigned char opcode;
+    WebSocketFrameData control_frame;
+    WebSocketFrameData message_frame;
+    WebSocketFrameData *frame;
+    apr_int64_t payload_length;
+    apr_int64_t mask_offset;
+    apr_int64_t extension_bytes_remaining;
+    int payload_length_bytes_remaining;
+    int masking;
+    int mask_index;
+    unsigned char mask[4];
+} WebSocketReadState;
+
+static void mod_websocket_handle_incoming(const WebSocketServer *server,
+                                          unsigned char *block,
+                                          apr_int64_t block_size,
+                                          WebSocketReadState *state,
+                                          websocket_config_rec *conf,
+                                          void *plugin_private)
+{
+    apr_int64_t block_offset = 0;
+
+    while (block_offset < block_size) {
+        switch (state->framing_state) {
+        case DATA_FRAMING_START:
+            /*
+             * Since we don't currently support any extensions,
+             * the reserve bits must be 0
+             */
+            if ((FRAME_GET_RSV1(block[block_offset]) != 0) ||
+                (FRAME_GET_RSV2(block[block_offset]) != 0) ||
+                (FRAME_GET_RSV3(block[block_offset]) != 0)) {
+                state->framing_state = DATA_FRAMING_CLOSE;
+                state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                break;
+            }
+            state->fin = FRAME_GET_FIN(block[block_offset]);
+            state->opcode = FRAME_GET_OPCODE(block[block_offset++]);
+
+            state->framing_state = DATA_FRAMING_PAYLOAD_LENGTH;
+
+            if (state->opcode >= 0x8) { /* Control frame */
+                if (state->fin) {
+                    state->frame = &state->control_frame;
+                    state->frame->opcode = state->opcode;
+                    state->frame->utf8_state = UTF8_VALID;
+                }
+                else {
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                    break;
+                }
+            }
+            else { /* Message frame */
+                state->frame = &state->message_frame;
+                if (state->opcode) {
+                    if (state->frame->fin) {
+                        state->frame->opcode = state->opcode;
+                        state->frame->utf8_state = UTF8_VALID;
+                    }
+                    else {
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                        break;
+                    }
+                }
+                else if (state->frame->fin ||
+                         ((state->opcode = state->frame->opcode) == 0)) {
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                    break;
+                }
+                state->frame->fin = state->fin;
+            }
+            state->payload_length = 0;
+            state->payload_length_bytes_remaining = 0;
+
+            if (block_offset >= block_size) {
+                break; /* Only break if we need more data */
+            }
+        case DATA_FRAMING_PAYLOAD_LENGTH:
+            state->payload_length = (apr_int64_t)
+                FRAME_GET_PAYLOAD_LEN(block[block_offset]);
+            state->masking = FRAME_GET_MASK(block[block_offset++]);
+
+            if (state->payload_length == 126) {
+                state->payload_length = 0;
+                state->payload_length_bytes_remaining = 2;
+            }
+            else if (state->payload_length == 127) {
+                state->payload_length = 0;
+                state->payload_length_bytes_remaining = 8;
+            }
+            else {
+                state->payload_length_bytes_remaining = 0;
+            }
+            if ((state->masking == 0) ||   /* Client-side mask is required */
+                ((state->opcode >= 0x8) && /* Control opcodes cannot have a payload larger than 125 bytes */
+                 (state->payload_length_bytes_remaining != 0))) {
+                state->framing_state = DATA_FRAMING_CLOSE;
+                state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                break;
+            }
+            else {
+                state->framing_state = DATA_FRAMING_PAYLOAD_LENGTH_EXT;
+            }
+            if (block_offset >= block_size) {
+                break;  /* Only break if we need more data */
+            }
+        case DATA_FRAMING_PAYLOAD_LENGTH_EXT:
+            while ((state->payload_length_bytes_remaining > 0) &&
+                   (block_offset < block_size)) {
+                state->payload_length *= 256;
+                state->payload_length += block[block_offset++];
+                state->payload_length_bytes_remaining--;
+            }
+            if (state->payload_length_bytes_remaining == 0) {
+                if ((state->payload_length < 0) ||
+                    (state->payload_length > conf->payload_limit)) {
+                    /* Invalid payload length */
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = (server->state->protocol_version >= 13) ?
+                                          STATUS_CODE_MESSAGE_TOO_LARGE :
+                                          STATUS_CODE_RESERVED;
+                    break;
+                }
+                else if (state->masking != 0) {
+                    state->framing_state = DATA_FRAMING_MASK;
+                }
+                else {
+                    state->framing_state = DATA_FRAMING_EXTENSION_DATA;
+                    break;
+                }
+            }
+            if (block_offset >= block_size) {
+                break;  /* Only break if we need more data */
+            }
+        case DATA_FRAMING_MASK:
+            while ((state->mask_index < 4) && (block_offset < block_size)) {
+                state->mask[state->mask_index++] = block[block_offset++];
+            }
+            if (state->mask_index == 4) {
+                state->framing_state = DATA_FRAMING_EXTENSION_DATA;
+                state->mask_offset = 0;
+                state->mask_index = 0;
+                if ((state->mask[0] == 0) && (state->mask[1] == 0) &&
+                    (state->mask[2] == 0) && (state->mask[3] == 0)) {
+                    state->masking = 0;
+                }
+            }
+            else {
+                break;
+            }
+            /* Fall through */
+        case DATA_FRAMING_EXTENSION_DATA:
+            /* Deal with extension data when we support them -- FIXME */
+            if (state->extension_bytes_remaining == 0) {
+                if (state->payload_length > 0) {
+                    state->frame->application_data = (unsigned char *)
+                        realloc(state->frame->application_data,
+                                state->frame->application_data_offset +
+                                state->payload_length);
+                    if (state->frame->application_data == NULL) {
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = (server->state->protocol_version >= 13) ?
+                                              STATUS_CODE_INTERNAL_ERROR :
+                                              STATUS_CODE_GOING_AWAY;
+                        break;
+                    }
+                }
+                state->framing_state = DATA_FRAMING_APPLICATION_DATA;
+            }
+            /* Fall through */
+        case DATA_FRAMING_APPLICATION_DATA:
+            {
+                apr_int64_t block_data_length;
+                apr_int64_t block_length = 0;
+                apr_uint64_t application_data_offset =
+                    state->frame->application_data_offset;
+                unsigned char *application_data =
+                    state->frame->application_data;
+
+                block_length = block_size - block_offset;
+                block_data_length =
+                    (state->payload_length >
+                     block_length) ? block_length : state->payload_length;
+
+                if (state->masking) {
+                    apr_int64_t i;
+
+                    if (state->opcode == OPCODE_TEXT) {
+                        unsigned int utf8_state = state->frame->utf8_state;
+                        unsigned char c;
+
+                        for (i = 0; i < block_data_length; i++) {
+                            c = block[block_offset++] ^
+                                state->mask[state->mask_offset++ & 3];
+                            utf8_state =
+                                validate_utf8[utf8_state + c];
+                            if (utf8_state == UTF8_INVALID) {
+                                state->payload_length = block_data_length;
+                                break;
+                            }
+                            application_data
+                                [application_data_offset++] = c;
+                        }
+                        state->frame->utf8_state = utf8_state;
+                    }
+                    else {
+                        /* Need to optimize the unmasking -- FIXME */
+                        for (i = 0; i < block_data_length; i++) {
+                            application_data
+                                [application_data_offset++] =
+                                block[block_offset++] ^
+                                state->mask[state->mask_offset++ & 3];
+                        }
+                    }
+                }
+                else if (block_data_length > 0) {
+                    memcpy(&application_data[application_data_offset],
+                           &block[block_offset], block_data_length);
+                    if (state->opcode == OPCODE_TEXT) {
+                        apr_int64_t i, application_data_end =
+                            application_data_offset +
+                            block_data_length;
+                        unsigned int utf8_state = state->frame->utf8_state;
+
+                        for (i = application_data_offset;
+                             i < application_data_end; i++) {
+                            utf8_state =
+                                validate_utf8[utf8_state +
+                                              application_data[i]];
+                            if (utf8_state == UTF8_INVALID) {
+                                state->payload_length = block_data_length;
+                                break;
+                            }
+                        }
+                        state->frame->utf8_state = utf8_state;
+                    }
+                    application_data_offset += block_data_length;
+                    block_offset += block_data_length;
+                }
+                state->payload_length -= block_data_length;
+
+                if (state->payload_length == 0) {
+                    int message_type = MESSAGE_TYPE_INVALID;
+
+                    switch (state->opcode) {
+                    case OPCODE_TEXT:
+                        if ((state->fin &&
+                            (state->frame->utf8_state != UTF8_VALID)) ||
+                            (state->frame->utf8_state == UTF8_INVALID)) {
+                            state->framing_state = DATA_FRAMING_CLOSE;
+                            state->status_code = STATUS_CODE_INVALID_UTF8;
+                        }
+                        else {
+                            message_type = MESSAGE_TYPE_TEXT;
+                        }
+                        break;
+                    case OPCODE_BINARY:
+                        message_type = MESSAGE_TYPE_BINARY;
+                        break;
+                    case OPCODE_CLOSE:
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_OK;
+                        break;
+                    case OPCODE_PING:
+                        apr_thread_mutex_lock(server->state->mutex);
+                        mod_websocket_send_internal(server->state,
+                                                    MESSAGE_TYPE_PONG,
+                                                    application_data,
+                                                    application_data_offset);
+                        apr_thread_mutex_unlock(server->state->mutex);
+                        break;
+                    case OPCODE_PONG:
+                        break;
+                    default:
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                        break;
+                    }
+                    if (state->fin && (message_type != MESSAGE_TYPE_INVALID)) {
+                        conf->plugin->on_message(plugin_private,
+                                                 server, message_type,
+                                                 application_data,
+                                                 application_data_offset);
+                    }
+                    if (state->framing_state != DATA_FRAMING_CLOSE) {
+                        state->framing_state = DATA_FRAMING_START;
+
+                        if (state->fin) {
+                            if (state->frame->application_data != NULL) {
+                                free(state->frame->application_data);
+                                state->frame->application_data = NULL;
+                            }
+                            application_data_offset = 0;
+                        }
+                    }
+                }
+                state->frame->application_data_offset =
+                    application_data_offset;
+            }
+            break;
+        case DATA_FRAMING_CLOSE:
+            block_offset = block_size;
+            break;
+        default:
+            state->framing_state = DATA_FRAMING_CLOSE;
+            state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+            break;
+        }
+    }
+}
+
 /*
  * The data framing handler requires that the server state mutex is locked by
  * the caller upon entering this function. It will be locked when leaving too.
@@ -509,19 +830,19 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
-        apr_int64_t extension_bytes_remaining = 0;
-        apr_int64_t payload_length = 0;
-        apr_int64_t mask_offset = 0;
-        int framing_state = DATA_FRAMING_START;
-        int payload_length_bytes_remaining = 0;
-        int mask_index = 0, masking = 0;
-        unsigned char mask[4] = { 0, 0, 0, 0 };
-        unsigned char fin = 0, opcode = 0xFF;
-        WebSocketFrameData control_frame = { 0, NULL, 1, 8, UTF8_VALID };
-        WebSocketFrameData message_frame = { 0, NULL, 1, 0, UTF8_VALID };
-        WebSocketFrameData *frame = &control_frame;
-        unsigned short status_code = STATUS_CODE_OK;
         unsigned char status_code_buffer[2];
+        WebSocketReadState read_state = { 0 };
+
+        read_state.framing_state = DATA_FRAMING_START;
+        read_state.status_code = STATUS_CODE_OK;
+        read_state.control_frame.fin = 1;
+        read_state.control_frame.opcode = 8;
+        read_state.control_frame.utf8_state = UTF8_VALID;
+        read_state.message_frame.fin = 1;
+        read_state.message_frame.opcode = 0;
+        read_state.message_frame.utf8_state = UTF8_VALID;
+        read_state.frame = &read_state.control_frame;
+        read_state.opcode = 0xFF;
 
         /* Initialize the pollset */
         pollfd.p = pool;
@@ -534,8 +855,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
-        while ((framing_state != DATA_FRAMING_CLOSE)) {
-            apr_int64_t block_offset = 0;
+        while ((read_state.framing_state != DATA_FRAMING_CLOSE)) {
             apr_status_t rv;
 
             do {
@@ -548,307 +868,19 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                 break;
             }
 
-            while (block_offset < block_size) {
-                switch (framing_state) {
-                case DATA_FRAMING_START:
-                    /*
-                     * Since we don't currently support any extensions,
-                     * the reserve bits must be 0
-                     */
-                    if ((FRAME_GET_RSV1(block[block_offset]) != 0) ||
-                        (FRAME_GET_RSV2(block[block_offset]) != 0) ||
-                        (FRAME_GET_RSV3(block[block_offset]) != 0)) {
-                        framing_state = DATA_FRAMING_CLOSE;
-                        status_code = STATUS_CODE_PROTOCOL_ERROR;
-                        break;
-                    }
-                    fin = FRAME_GET_FIN(block[block_offset]);
-                    opcode = FRAME_GET_OPCODE(block[block_offset++]);
-
-                    framing_state = DATA_FRAMING_PAYLOAD_LENGTH;
-
-                    if (opcode >= 0x8) { /* Control frame */
-                        if (fin) {
-                            frame = &control_frame;
-                            frame->opcode = opcode;
-                            frame->utf8_state = UTF8_VALID;
-                        }
-                        else {
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = STATUS_CODE_PROTOCOL_ERROR;
-                            break;
-                        }
-                    }
-                    else { /* Message frame */
-                        frame = &message_frame;
-                        if (opcode) {
-                            if (frame->fin) {
-                                frame->opcode = opcode;
-                                frame->utf8_state = UTF8_VALID;
-                            }
-                            else {
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_PROTOCOL_ERROR;
-                                break;
-                            }
-                        }
-                        else if (frame->fin ||
-                                 ((opcode = frame->opcode) == 0)) {
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = STATUS_CODE_PROTOCOL_ERROR;
-                            break;
-                        }
-                        frame->fin = fin;
-                    }
-                    payload_length = 0;
-                    payload_length_bytes_remaining = 0;
-
-                    if (block_offset >= block_size) {
-                        break; /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_PAYLOAD_LENGTH:
-                    payload_length = (apr_int64_t)
-                        FRAME_GET_PAYLOAD_LEN(block[block_offset]);
-                    masking = FRAME_GET_MASK(block[block_offset++]);
-
-                    if (payload_length == 126) {
-                        payload_length = 0;
-                        payload_length_bytes_remaining = 2;
-                    }
-                    else if (payload_length == 127) {
-                        payload_length = 0;
-                        payload_length_bytes_remaining = 8;
-                    }
-                    else {
-                        payload_length_bytes_remaining = 0;
-                    }
-                    if ((masking == 0) ||   /* Client-side mask is required */
-                        ((opcode >= 0x8) && /* Control opcodes cannot have a payload larger than 125 bytes */
-                         (payload_length_bytes_remaining != 0))) {
-                        framing_state = DATA_FRAMING_CLOSE;
-                        status_code = STATUS_CODE_PROTOCOL_ERROR;
-                        break;
-                    }
-                    else {
-                        framing_state = DATA_FRAMING_PAYLOAD_LENGTH_EXT;
-                    }
-                    if (block_offset >= block_size) {
-                        break;  /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_PAYLOAD_LENGTH_EXT:
-                    while ((payload_length_bytes_remaining > 0) &&
-                           (block_offset < block_size)) {
-                        payload_length *= 256;
-                        payload_length += block[block_offset++];
-                        payload_length_bytes_remaining--;
-                    }
-                    if (payload_length_bytes_remaining == 0) {
-                        if ((payload_length < 0) ||
-                            (payload_length > conf->payload_limit)) {
-                            /* Invalid payload length */
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = (state->protocol_version >= 13) ?
-                                           STATUS_CODE_MESSAGE_TOO_LARGE :
-                                           STATUS_CODE_RESERVED;
-                            break;
-                        }
-                        else if (masking != 0) {
-                            framing_state = DATA_FRAMING_MASK;
-                        }
-                        else {
-                            framing_state = DATA_FRAMING_EXTENSION_DATA;
-                            break;
-                        }
-                    }
-                    if (block_offset >= block_size) {
-                        break;  /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_MASK:
-                    while ((mask_index < 4) && (block_offset < block_size)) {
-                        mask[mask_index++] = block[block_offset++];
-                    }
-                    if (mask_index == 4) {
-                        framing_state = DATA_FRAMING_EXTENSION_DATA;
-                        mask_offset = 0;
-                        mask_index = 0;
-                        if ((mask[0] == 0) && (mask[1] == 0) &&
-                            (mask[2] == 0) && (mask[3] == 0)) {
-                            masking = 0;
-                        }
-                    }
-                    else {
-                        break;
-                    }
-                    /* Fall through */
-                case DATA_FRAMING_EXTENSION_DATA:
-                    /* Deal with extension data when we support them -- FIXME */
-                    if (extension_bytes_remaining == 0) {
-                        if (payload_length > 0) {
-                            frame->application_data = (unsigned char *)
-                                realloc(frame->application_data,
-                                        frame->application_data_offset +
-                                        payload_length);
-                            if (frame->application_data == NULL) {
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = (state->protocol_version >= 13) ?
-                                               STATUS_CODE_INTERNAL_ERROR :
-                                               STATUS_CODE_GOING_AWAY;
-                                break;
-                            }
-                        }
-                        framing_state = DATA_FRAMING_APPLICATION_DATA;
-                    }
-                    /* Fall through */
-                case DATA_FRAMING_APPLICATION_DATA:
-                    {
-                        apr_int64_t block_data_length;
-                        apr_int64_t block_length = 0;
-                        apr_uint64_t application_data_offset =
-                            frame->application_data_offset;
-                        unsigned char *application_data =
-                            frame->application_data;
-
-                        block_length = block_size - block_offset;
-                        block_data_length =
-                            (payload_length >
-                             block_length) ? block_length : payload_length;
-
-                        if (masking) {
-                            apr_int64_t i;
-
-                            if (opcode == OPCODE_TEXT) {
-                                unsigned int utf8_state = frame->utf8_state;
-                                unsigned char c;
-
-                                for (i = 0; i < block_data_length; i++) {
-                                    c = block[block_offset++] ^
-                                        mask[mask_offset++ & 3];
-                                    utf8_state =
-                                        validate_utf8[utf8_state + c];
-                                    if (utf8_state == UTF8_INVALID) {
-                                        payload_length = block_data_length;
-                                        break;
-                                    }
-                                    application_data
-                                        [application_data_offset++] = c;
-                                }
-                                frame->utf8_state = utf8_state;
-                            }
-                            else {
-                                /* Need to optimize the unmasking -- FIXME */
-                                for (i = 0; i < block_data_length; i++) {
-                                    application_data
-                                        [application_data_offset++] =
-                                        block[block_offset++] ^
-                                        mask[mask_offset++ & 3];
-                                }
-                            }
-                        }
-                        else if (block_data_length > 0) {
-                            memcpy(&application_data[application_data_offset],
-                                   &block[block_offset], block_data_length);
-                            if (opcode == OPCODE_TEXT) {
-                                apr_int64_t i, application_data_end =
-                                    application_data_offset +
-                                    block_data_length;
-                                unsigned int utf8_state = frame->utf8_state;
-
-                                for (i = application_data_offset;
-                                     i < application_data_end; i++) {
-                                    utf8_state =
-                                        validate_utf8[utf8_state +
-                                                      application_data[i]];
-                                    if (utf8_state == UTF8_INVALID) {
-                                        payload_length = block_data_length;
-                                        break;
-                                    }
-                                }
-                                frame->utf8_state = utf8_state;
-                            }
-                            application_data_offset += block_data_length;
-                            block_offset += block_data_length;
-                        }
-                        payload_length -= block_data_length;
-
-                        if (payload_length == 0) {
-                            int message_type = MESSAGE_TYPE_INVALID;
-
-                            switch (opcode) {
-                            case OPCODE_TEXT:
-                                if ((fin &&
-                                    (frame->utf8_state != UTF8_VALID)) ||
-                                    (frame->utf8_state == UTF8_INVALID)) {
-                                    framing_state = DATA_FRAMING_CLOSE;
-                                    status_code = STATUS_CODE_INVALID_UTF8;
-                                }
-                                else {
-                                    message_type = MESSAGE_TYPE_TEXT;
-                                }
-                                break;
-                            case OPCODE_BINARY:
-                                message_type = MESSAGE_TYPE_BINARY;
-                                break;
-                            case OPCODE_CLOSE:
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_OK;
-                                break;
-                            case OPCODE_PING:
-                                apr_thread_mutex_lock(state->mutex);
-                                mod_websocket_send_internal(state,
-                                                            MESSAGE_TYPE_PONG,
-                                                            application_data,
-                                                            application_data_offset);
-                                apr_thread_mutex_unlock(state->mutex);
-                                break;
-                            case OPCODE_PONG:
-                                break;
-                            default:
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_PROTOCOL_ERROR;
-                                break;
-                            }
-                            if (fin && (message_type != MESSAGE_TYPE_INVALID)) {
-                                conf->plugin->on_message(plugin_private,
-                                                         server, message_type,
-                                                         application_data,
-                                                         application_data_offset);
-                            }
-                            if (framing_state != DATA_FRAMING_CLOSE) {
-                                framing_state = DATA_FRAMING_START;
-
-                                if (fin) {
-                                    if (frame->application_data != NULL) {
-                                        free(frame->application_data);
-                                        frame->application_data = NULL;
-                                    }
-                                    application_data_offset = 0;
-                                }
-                            }
-                        }
-                        frame->application_data_offset =
-                            application_data_offset;
-                    }
-                    break;
-                case DATA_FRAMING_CLOSE:
-                    block_offset = block_size;
-                    break;
-                default:
-                    framing_state = DATA_FRAMING_CLOSE;
-                    status_code = STATUS_CODE_PROTOCOL_ERROR;
-                    break;
-                }
-            }
+            mod_websocket_handle_incoming(server, block, block_size,
+                                          &read_state, conf, plugin_private);
         }
-        if (message_frame.application_data != NULL) {
-            free(message_frame.application_data);
+        if (read_state.message_frame.application_data != NULL) {
+            free(read_state.message_frame.application_data);
         }
-        if (control_frame.application_data != NULL) {
-            free(control_frame.application_data);
+        if (read_state.control_frame.application_data != NULL) {
+            free(read_state.control_frame.application_data);
         }
 
         /* Send server-side closing handshake */
-        status_code_buffer[0] = (status_code >> 8) & 0xFF;
-        status_code_buffer[1] = status_code & 0xFF;
+        status_code_buffer[0] = (read_state.status_code >> 8) & 0xFF;
+        status_code_buffer[1] = read_state.status_code & 0xFF;
 
         apr_thread_mutex_lock(state->mutex);
         mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
-- 
2.1.1


[PATCH 2/5] Separate plugin_send from the send implementation

Posted by Jacob Champion <ja...@ni.com>.
The public-facing API will eventually communicate cross-thread instead
of writing directly to the brigade. In preparation, pull the send logic
into an internal function, and call that from the read loop instead of
mod_websocket_plugin_send.
---
 mod_websocket.c | 161 ++++++++++++++++++++++++++++++++------------------------
 1 file changed, 91 insertions(+), 70 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 5a04936..62be765 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -290,85 +290,103 @@ static void CALLBACK mod_websocket_protocol_set(const WebSocketServer *server,
     }
 }
 
+/*
+ * Sends data to the WebSocket connection using the given server state. The
+ * server state must be locked upon entering this function. buffer_size is
+ * assumed to be within the limits defined by the WebSocket protocol (i.e. fits
+ * in 63 bits).
+ */
+static size_t mod_websocket_send_internal(WebSocketState *state,
+                                          const int type,
+                                          const unsigned char *buffer,
+                                          const size_t buffer_size)
+{
+    apr_uint64_t payload_length =
+        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
+    size_t written = 0;
+
+    if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
+        unsigned char header[32];
+        ap_filter_t *of = state->r->connection->output_filters;
+        apr_size_t pos = 0;
+        unsigned char opcode;
+
+        switch (type) {
+        case MESSAGE_TYPE_TEXT:
+            opcode = OPCODE_TEXT;
+            break;
+        case MESSAGE_TYPE_BINARY:
+            opcode = OPCODE_BINARY;
+            break;
+        case MESSAGE_TYPE_PING:
+            opcode = OPCODE_PING;
+            break;
+        case MESSAGE_TYPE_PONG:
+            opcode = OPCODE_PONG;
+            break;
+        case MESSAGE_TYPE_CLOSE:
+        default:
+            state->closing = 1;
+            opcode = OPCODE_CLOSE;
+            break;
+        }
+        header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
+        if (payload_length < 126) {
+            header[pos++] =
+                FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
+        }
+        else {
+            if (payload_length < 65536) {
+                header[pos++] = FRAME_SET_MASK(0) | 126;
+            }
+            else {
+                header[pos++] = FRAME_SET_MASK(0) | 127;
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
+            }
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
+        }
+        ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
+        if (payload_length > 0) {
+            if (ap_fwrite(of, state->obb,
+                          (const char *)buffer,
+                          buffer_size) == APR_SUCCESS) { /* Payload Data */
+                written = buffer_size;
+            }
+        }
+        if (ap_fflush(of, state->obb) != APR_SUCCESS) {
+            written = 0;
+        }
+    }
+
+    return written;
+}
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
                                                  const size_t buffer_size)
 {
-    apr_uint64_t payload_length =
-        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
-
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-
-        if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
-            unsigned char header[32];
-            ap_filter_t *of = state->r->connection->output_filters;
-            apr_size_t pos = 0;
-            unsigned char opcode;
-
-            switch (type) {
-            case MESSAGE_TYPE_TEXT:
-                opcode = OPCODE_TEXT;
-                break;
-            case MESSAGE_TYPE_BINARY:
-                opcode = OPCODE_BINARY;
-                break;
-            case MESSAGE_TYPE_PING:
-                opcode = OPCODE_PING;
-                break;
-            case MESSAGE_TYPE_PONG:
-                opcode = OPCODE_PONG;
-                break;
-            case MESSAGE_TYPE_CLOSE:
-            default:
-                state->closing = 1;
-                opcode = OPCODE_CLOSE;
-                break;
-            }
-            header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
-            if (payload_length < 126) {
-                header[pos++] =
-                    FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
-            }
-            else {
-                if (payload_length < 65536) {
-                    header[pos++] = FRAME_SET_MASK(0) | 126;
-                }
-                else {
-                    header[pos++] = FRAME_SET_MASK(0) | 127;
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
-                }
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
-            }
-            ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
-            if (payload_length > 0) {
-                if (ap_fwrite(of, state->obb,
-                              (const char *)buffer,
-                              buffer_size) == APR_SUCCESS) { /* Payload Data */
-                    written = buffer_size;
-                }
-            }
-            if (ap_fflush(of, state->obb) != APR_SUCCESS) {
-                written = 0;
-            }
-        }
+        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
         apr_thread_mutex_unlock(state->mutex);
     }
+
     return written;
 }
 
+
 static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
                                                 server)
 {
@@ -775,10 +793,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                                 status_code = STATUS_CODE_OK;
                                 break;
                             case OPCODE_PING:
-                                mod_websocket_plugin_send(server,
-                                                          MESSAGE_TYPE_PONG,
-                                                          application_data,
-                                                          application_data_offset);
+                                apr_thread_mutex_lock(state->mutex);
+                                mod_websocket_send_internal(state,
+                                                            MESSAGE_TYPE_PONG,
+                                                            application_data,
+                                                            application_data_offset);
+                                apr_thread_mutex_unlock(state->mutex);
                                 break;
                             case OPCODE_PONG:
                                 break;
@@ -829,12 +849,13 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* Send server-side closing handshake */
         status_code_buffer[0] = (status_code >> 8) & 0xFF;
         status_code_buffer[1] = status_code & 0xFF;
-        mod_websocket_plugin_send(server, MESSAGE_TYPE_CLOSE,
-                                  status_code_buffer,
-                                  sizeof(status_code_buffer));
+
+        apr_thread_mutex_lock(state->mutex);
+        mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
+                                    status_code_buffer,
+                                    sizeof(status_code_buffer));
 
         /* We are done with the bucket brigade */
-        apr_thread_mutex_lock(state->mutex);
         state->obb = NULL;
         apr_brigade_destroy(obb);
     }
-- 
2.1.1


[PATCH 5/5] Remove extra pool and clean up brigades

Posted by Jacob Champion <ja...@ni.com>.
Now that the input and output brigades are only operated on from a
single thread, the extra pool and bucket allocator added as part of
commit cfaef071 can be removed. Also, allocate a single long-lived input
brigade instead of allocating new ones for every socket read.
---
 mod_websocket.c | 49 ++++++++++++++++++-------------------------------
 1 file changed, 18 insertions(+), 31 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 8e3bf26..9c62ca5 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -465,21 +465,19 @@ static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
 /*
  * Read a buffer of data from the input stream.
  */
-static apr_status_t mod_websocket_read_nonblock(request_rec *r, char *buffer,
+static apr_status_t mod_websocket_read_nonblock(request_rec *r,
+                                                apr_bucket_brigade *bb,
+                                                char *buffer,
                                                 apr_size_t *bufsiz)
 {
-    apr_status_t rv = APR_ENOMEM;
-    apr_bucket_brigade *bb;
+    apr_status_t rv;
 
-    bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
-    if (bb != NULL) {
-        if ((rv =
-             ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
-                            APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
-            rv = apr_brigade_flatten(bb, buffer, bufsiz);
-        }
-        apr_brigade_destroy(bb);
+    if ((rv = ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
+                             APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
+        rv = apr_brigade_flatten(bb, buffer, bufsiz);
+        apr_brigade_cleanup(bb);
     }
+
     return rv;
 }
 
@@ -893,29 +891,17 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 {
     WebSocketState *state = server->state;
     request_rec *r = state->r;
-    apr_pool_t *pool = NULL;
-    apr_bucket_alloc_t *bucket_alloc;
-    apr_bucket_brigade *obb;
+    apr_bucket_brigade *ibb, *obb;
     apr_pollset_t *pollset;
     apr_pollfd_t pollfd = { 0 };
     const apr_pollfd_t *signalled;
     apr_int32_t pollcnt;
     apr_queue_t * queue;
 
-    /* We cannot use the same bucket allocator for the ouput bucket brigade
-     * obb as the one associated with the connection (r->connection->bucket_alloc)
-     * because the same bucket allocator cannot be used in two different
-     * threads, and we use the connection bucket allocator in this
-     * thread - see docs on apr_bucket_alloc_create(). This results in
-     * occasional core dumps. So create our own bucket allocator and pool
-     * for output thread bucket brigade. (Thanks to Alex Bligh -- abligh)
-     */
-
-    if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
-        ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
-        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
-        (apr_pollset_create(&pollset, 1, pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
-        (apr_queue_create(&queue, QUEUE_CAPACITY, pool) == APR_SUCCESS)) {
+    if (((ibb = apr_brigade_create(r->pool, r->connection->bucket_alloc)) != NULL) &&
+        ((obb = apr_brigade_create(r->pool, r->connection->bucket_alloc)) != NULL) &&
+        (apr_pollset_create(&pollset, 1, r->pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
+        (apr_queue_create(&queue, QUEUE_CAPACITY, r->pool) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         unsigned char status_code_buffer[2];
@@ -935,7 +921,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         state->queue = queue;
 
         /* Initialize the pollset */
-        pollfd.p = pool;
+        pollfd.p = r->pool;
         pollfd.desc_type = APR_POLL_SOCKET;
         pollfd.reqevents = APR_POLLIN;
         pollfd.desc.s = ap_get_conn_socket(state->r->connection);
@@ -960,7 +946,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 
             /* Check to see if there is any data to read. */
             block_size = sizeof(block);
-            rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+            rv = mod_websocket_read_nonblock(r, ibb, (char *)block, &block_size);
 
             if (rv == APR_SUCCESS) {
                 mod_websocket_handle_incoming(server, block, block_size,
@@ -1026,8 +1012,9 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                                     status_code_buffer,
                                     sizeof(status_code_buffer));
 
-        /* We are done with the bucket brigade */
+        /* We are done with the bucket brigades */
         state->obb = NULL;
+        apr_brigade_destroy(ibb);
         apr_brigade_destroy(obb);
 
         state->pollset = NULL;
-- 
2.1.1


[PATCH 4/5] Read and write to the brigade from only one thread

Posted by Jacob Champion <ja...@ni.com>.
The prior implementation allowed clients to write to a parallel bucket
brigade from separate threads, while the main thread read incoming
messages from the original brigade. This appears to cause thread safety
issues, especially when combined with TLS (OpenSSL has the ability to
read from the socket during a write, and vice-versa).

This patch, inspired by both mod_spdy and mod_proxy_wstunnel, causes
cross-thread invocations of mod_websocket_plugin_send() to place their
messages on a queue and signal the main thread that a write is pending.
The event loop is based on apr_pollset.

See http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54EE23EB.5040705@ni.com%3E
for history. Thanks to Alex Bligh and Eric Covener for their
suggestions.
---
 mod_websocket.c | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 164 insertions(+), 11 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 1e88ef6..8e3bf26 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -25,8 +25,10 @@
  */
 
 #include "apr_base64.h"
+#include "apr_queue.h"
 #include "apr_sha1.h"
 #include "apr_strings.h"
+#include "apr_thread_cond.h"
 
 #include "httpd.h"
 #include "http_config.h"
@@ -58,6 +60,8 @@ typedef struct
 
 #define BLOCK_DATA_SIZE              4096
 
+#define QUEUE_CAPACITY                 16
+
 #define DATA_FRAMING_MASK               0
 #define DATA_FRAMING_START              1
 #define DATA_FRAMING_PAYLOAD_LENGTH     2
@@ -214,10 +218,14 @@ typedef struct _WebSocketState
 {
     request_rec *r;
     apr_bucket_brigade *obb;
+    apr_os_thread_t main_thread;
     apr_thread_mutex_t *mutex;
+    apr_thread_cond_t *cond;
     apr_array_header_t *protocols;
     int closing;
     apr_int64_t protocol_version;
+    apr_pollset_t *pollset;
+    apr_queue_t *queue;
 } WebSocketState;
 
 static request_rec *CALLBACK mod_websocket_request(const WebSocketServer *server)
@@ -367,6 +375,15 @@ static size_t mod_websocket_send_internal(WebSocketState *state,
     return written;
 }
 
+typedef struct
+{
+    int type;
+    const unsigned char * buffer;
+    size_t buffer_size;
+    int done;
+    size_t written;
+} WebSocketMessageData;
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
@@ -375,11 +392,60 @@ static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
+    /* FIXME - if sending a zero-length message, the API cannot distinguish
+     * between success and failure */
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+
+        if (apr_os_thread_equal(apr_os_thread_current(), state->main_thread)) {
+            /* This is the main thread. It's safe to write messages directly. */
+            written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+        }
+        else if ((state->pollset != NULL) && (state->queue != NULL) &&
+                 !state->closing) {
+            /* Dispatch this message to the main thread. */
+            apr_status_t rv;
+            WebSocketMessageData msg = { 0 };
+
+            /* Populate the message data. */
+            msg.type = type;
+            msg.buffer = buffer;
+            msg.buffer_size = buffer_size;
+
+            /* Queue the message. */
+            do {
+                rv = apr_queue_push(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
+
+            if (rv != APR_SUCCESS) {
+                /* Couldn't push the message onto the queue. */
+                goto send_unlock;
+            }
+
+            /* Interrupt the pollset. */
+            rv = apr_pollset_wakeup(state->pollset);
+
+            if (rv != APR_SUCCESS) {
+                /*
+                 * Couldn't wake up poll...? We can't return zero since we've
+                 * already pushed the message, and it might actually be sent...
+                 */
+                /* TODO: log. */
+            }
+
+            /* Wait for the message to be written. */
+            while (!msg.done && !state->closing) {
+                apr_thread_cond_wait(state->cond, state->mutex);
+            }
+
+            if (msg.done) {
+                written = msg.written;
+            }
+        }
+
+send_unlock:
         apr_thread_mutex_unlock(state->mutex);
     }
 
@@ -797,6 +863,26 @@ static void mod_websocket_handle_incoming(const WebSocketServer *server,
     }
 }
 
+static void mod_websocket_handle_outgoing(const WebSocketServer *server,
+                                          WebSocketMessageData *msg)
+{
+    apr_thread_mutex_lock(server->state->mutex);
+    msg->written = mod_websocket_send_internal(server->state, msg->type,
+                                               msg->buffer, msg->buffer_size);
+
+    /*
+     * Notify plugin_send() that the message has been sent.
+     *
+     * XXX Wake up _all_ the waiting threads, since we don't know which one owns
+     * this message. This is contentious if there are a lot of threads writing
+     * in parallel.
+     */
+    msg->done = 1;
+    apr_thread_cond_broadcast(server->state->cond);
+
+    apr_thread_mutex_unlock(server->state->mutex);
+}
+
 /*
  * The data framing handler requires that the server state mutex is locked by
  * the caller upon entering this function. It will be locked when leaving too.
@@ -814,6 +900,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     apr_pollfd_t pollfd = { 0 };
     const apr_pollfd_t *signalled;
     apr_int32_t pollcnt;
+    apr_queue_t * queue;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection (r->connection->bucket_alloc)
@@ -827,7 +914,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
         ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
-        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
+        (apr_pollset_create(&pollset, 1, pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
+        (apr_queue_create(&queue, QUEUE_CAPACITY, pool) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         unsigned char status_code_buffer[2];
@@ -844,6 +932,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         read_state.frame = &read_state.control_frame;
         read_state.opcode = 0xFF;
 
+        state->queue = queue;
+
         /* Initialize the pollset */
         pollfd.p = pool;
         pollfd.desc_type = APR_POLL_SOCKET;
@@ -851,25 +941,74 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         pollfd.desc.s = ap_get_conn_socket(state->r->connection);
         apr_pollset_add(pollset, &pollfd);
 
+        state->pollset = pollset;
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
+        /*
+         * Main loop, inspired by mod_spdy. Alternate between data coming from
+         * the client and data coming from the server. Only block in poll() if
+         * there is no work to be done for either side.
+         */
         while ((read_state.framing_state != DATA_FRAMING_CLOSE)) {
             apr_status_t rv;
+            apr_interval_time_t timeout;
+            WebSocketMessageData *msg;
+            int work_done = 0;
 
+            /* Check to see if there is any data to read. */
+            block_size = sizeof(block);
+            rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_incoming(server, block, block_size,
+                                              &read_state, conf, plugin_private);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
+
+            /* Check to see if there is any data to write. */
             do {
-                block_size = sizeof(block);
-                rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
-            } while (APR_STATUS_IS_EAGAIN(rv) &&
-                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == APR_SUCCESS);
+                rv = apr_queue_trypop(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
 
-            if (rv != APR_SUCCESS) {
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_outgoing(server, msg);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
                 break;
             }
 
-            mod_websocket_handle_incoming(server, block, block_size,
-                                          &read_state, conf, plugin_private);
+            /*
+             * If there's nothing to do, wait for new work to come in.
+             *
+             * Because Windows cannot poll on both a file pipe and a socket,
+             * plugin_send() uses apr_pollset_wakeup() to signal that new data
+             * is available to write. This is lossy (multiple threads calling
+             * wakeup() will result in only one wakeup here) so it's important
+             * that we do not block until state->queue has emptied. Otherwise
+             * it's possible to lose messages in the queue.
+             *
+             * NOTE: The wakeup pipe is drained only during apr_pollset_poll(),
+             * so we call it each iteration to avoid filling it up. We only
+             * block in poll() (negative timeout) if there was no work done
+             * during the current iteration.
+             */
+            timeout = work_done ? 0 : -1;
+            rv = apr_pollset_poll(state->pollset, timeout, &pollcnt, &signalled);
+
+            if ((rv != APR_SUCCESS) && !APR_STATUS_IS_EINTR(rv) &&
+                    !APR_STATUS_IS_TIMEUP(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
         }
         if (read_state.message_frame.application_data != NULL) {
             free(read_state.message_frame.application_data);
@@ -890,6 +1029,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* We are done with the bucket brigade */
         state->obb = NULL;
         apr_brigade_destroy(obb);
+
+        state->pollset = NULL;
+        apr_pollset_destroy(pollset);
+
+        state->queue = NULL;
+        apr_queue_term(queue);
     }
 }
 
@@ -957,8 +1102,10 @@ static int mod_websocket_method_handler(request_rec *r)
                                          &websocket_module);
 
                 if ((conf != NULL) && (conf->plugin != NULL)) {
-                    WebSocketState state =
-                        { r, NULL, NULL, NULL, 0, protocol_version };
+                    WebSocketState state = {
+                        r, NULL, apr_os_thread_current(), NULL, NULL, NULL, 0,
+                        protocol_version, NULL, NULL
+                    };
                     WebSocketServer server = {
                         sizeof(WebSocketServer), 1, &state,
                         mod_websocket_request, mod_websocket_header_get,
@@ -1015,6 +1162,8 @@ static int mod_websocket_method_handler(request_rec *r)
                     apr_thread_mutex_create(&state.mutex,
                                             APR_THREAD_MUTEX_DEFAULT,
                                             r->pool);
+                    apr_thread_cond_create(&state.cond, r->pool);
+
                     apr_thread_mutex_lock(state.mutex);
 
                     /*
@@ -1043,6 +1192,9 @@ static int mod_websocket_method_handler(request_rec *r)
                         mod_websocket_data_framing(&server, conf,
                                                    plugin_private);
 
+                        /* Wake up any waiting plugin_sends before closing */
+                        apr_thread_cond_broadcast(state.cond);
+
                         apr_thread_mutex_unlock(state.mutex);
 
                         /* Tell the plugin that we are disconnecting */
@@ -1068,6 +1220,7 @@ static int mod_websocket_method_handler(request_rec *r)
                     /* Close the connection */
                     ap_lingering_close(r->connection);
 
+                    apr_thread_cond_destroy(state.cond);
                     apr_thread_mutex_destroy(state.mutex);
 
                     return OK;
-- 
2.1.1


[PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Jacob Champion <ja...@ni.com>.
The first patch changes to nonblocking reads and the use of
apr_pollset_poll(). The next two patches are meant to be mostly
refactoring and code motion, in order to make the fourth patch (which
contains the actual architecture changes) more straightforward. The
fifth patch is cleanup of now unnecessary code.

Comments and suggestions are welcome!

The code in this patchset is Copyright (c) National Instruments and
Apache Software Foundation, licensed to the public under the Apache
License 2.0.

Jacob Champion (5):
  Change reads to nonblocking/poll
  Separate plugin_send from the send implementation
  Pull framing logic into a separate function
  Read and write to the brigade from only one thread
  Remove extra pool and clean up brigades

 mod_websocket.c | 1005 +++++++++++++++++++++++++++++++++----------------------
 1 file changed, 607 insertions(+), 398 deletions(-)

-- 
2.1.1


Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 3/3/2015 3:42 PM, Yann Ylavic wrote:

> Please post here so that people reaching this thread have a solution.

Sounds good. Approval could be quick, or it may take a while; I haven't
abandoned the thread. :)

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Yann Ylavic <yl...@gmail.com>.
On Tue, Mar 3, 2015 at 10:00 PM, Jacob Champion <ja...@ni.com> wrote:
>
> I'm waiting on internal approval to post the five-patch set here (unless
> of course there is any objection from the mailing list; I don't see a
> lot of patch mail in the archives).

Please post here so that people reaching this thread have a solution.

Regards,
Yann.

Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 3/3/2015 3:20 PM, Alex Bligh wrote:

> I for one would be interested in this. Feel free to fork my repo on
> github.

Alex,

I decided to base my patchset on the master branch of the original repo,
both in the hopes that it would be useful to a wider set of people, and
in the assumption that your additional thread-safety changes would no
longer be needed with my patches.

Hopefully the mailbox patch format is easy enough to apply to your own
repo; if not, let me know what I can do to make it easier. (Of course,
the patchset I sent in got thoroughly jumbled by my own client.
Hopefully it's in order for the rest of you...)

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Alex Bligh <al...@alex.org.uk>.
On 3 Mar 2015, at 21:00, Jacob Champion <ja...@ni.com> wrote:

> It turns out this works pretty well. The key complication is that the
> wakeup is lossy (multiple simultaneous wakeups are treated as one) so
> it's important to drain the entire "message queue" before polling again.
> I borrowed mod_spdy's sort-of-busy-loop approach to do that, and I used
> mod_proxy_wstunnel's poll-on-the-input-socket approach to block.
> 
> The result is a mod_websocket that hasn't crashed yet during overnight
> TLS stress tests (it used to crash regularly within a couple of seconds
> of starting). I was also able to remove the extra pool and bucket
> allocator that Alex added, since reading and writing are now done using
> only a single thread.
> 
> I'm waiting on internal approval to post the five-patch set here (unless
> of course there is any objection from the mailing list; I don't see a
> lot of patch mail in the archives). It would make more sense to
> contribute directly to the original author, if said author were still
> around...

I for one would be interested in this. Feel free to fork my repo on
github.

-- 
Alex Bligh





Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 2/26/2015 1:00 PM, Jacob Champion wrote:

> In this case, if I can get the input socket, I might be able to
> construct an APR_POLLSET_WAKEABLE pollset around it, and use
> apr_pollset_wakeup() to break out of the poll whenever I want to write
> output.

It turns out this works pretty well. The key complication is that the
wakeup is lossy (multiple simultaneous wakeups are treated as one) so
it's important to drain the entire "message queue" before polling again.
I borrowed mod_spdy's sort-of-busy-loop approach to do that, and I used
mod_proxy_wstunnel's poll-on-the-input-socket approach to block.

The result is a mod_websocket that hasn't crashed yet during overnight
TLS stress tests (it used to crash regularly within a couple of seconds
of starting). I was also able to remove the extra pool and bucket
allocator that Alex added, since reading and writing are now done using
only a single thread.

I'm waiting on internal approval to post the five-patch set here (unless
of course there is any objection from the mailing list; I don't see a
lot of patch mail in the archives). It would make more sense to
contribute directly to the original author, if said author were still
around...

Thanks, Alex and Eric, for your suggestions!

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 2/26/2015 7:23 AM, Eric Covener wrote:

> You might check out how mod_spdy shares a connection between threads.

Hi Eric, thanks for the suggestion! mod_spdy is interesting; IIUC, the
logic is basically

    output_block_time = 1 ms
    while (not stopped) {
        // Input cycle
        if no output is possible:
            block to read data
        else:
            read as much data as we have without blocking

        if we got data:
            process the data
            reset output_block_time to 1 ms

        // Output cycle
        if output is possible:
            wait up to <output_block_time> for output

        if we have output:
            write it to socket (blocking)
        else:
            output_block_time *= 2, max out at 30 ms
    }

I'm assuming this approach was taken because SPDY is still (I think)
request-response based. For WebSocket, output is _always_ possible at
any time, which means we would never get to perform the "block to read
data" step. We'd have to introduce a maximum blocking time for input
too, so an idle-but-connected WebSocket would continually bounce between
waiting on input and waiting on output. The exponential backoff helps,
but the somewhat random spikes in latency mean it's definitely not my
first choice.

I'm interested in the TODO at the bottom of that block though:

mod_spdy/common/spdy_session.cc:191:
> TODO(mdsteele): What we really want to be able to do is to block
> until *either* more input or more output is available. Unfortunely,
> there's no good way to query the input side (in Apache). One
> possibility would be to grab the input socket object (which we can
> do), and then arrange to block until either the socket is ready to 
> read OR our output queue is nonempty (obviously we would abstract 
> that away in SpdySessionIO), but there's not even a nice way to do
> that (that I know of).

In this case, if I can get the input socket, I might be able to
construct an APR_POLLSET_WAKEABLE pollset around it, and use
apr_pollset_wakeup() to break out of the poll whenever I want to write
output. I'm currently failing to find how to get a socket from a
request_rec, though. I'm guessing it's not supposed to be easy...

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Eric Covener <co...@gmail.com>.
On Tue, Feb 24, 2015 at 7:16 PM, Jacob Champion <ja...@ni.com> wrote:
> - Is it even possible/safe to do an asynchronous "one thread reads, one
> thread writes" operation in a production-quality module? (In other
> words, is mod_websocket just unfixably broken?)

You might check out how mod_spdy shares a connection between threads.

-- 
Eric Covener
covener@gmail.com

Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 2/26/2015 1:04 PM, Jacob Champion wrote:

> On 2/25/2015 6:35 PM, Alex Bligh wrote:
> 
>> 2 x apr_file_pipe_create ?
> 
> I also found APR_POLLSET_WAKEABLE and apr_pollset_wakeup(), which appear
> to encapsulate the self-pipe into the pollset itself.

It turns out file descriptors cannot be polled on Windows, so
apr_file_pipe_create() is out. APR has a Windows-only function called
apr_file_socket_pipe_create(), but it's private to the implementation of
apr_pollset_wakeup(). So apr_pollset_wakeup() it is.

(I would prefer to use apr_file_socket_pipe_create() myself -- my
current solution has the theoretical problem that the wakeup pipe could
fill up, if clients send messages fast enough that poll() is never called.)

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 2/25/2015 6:35 PM, Alex Bligh wrote:

> 2 x apr_file_pipe_create ?

I also found APR_POLLSET_WAKEABLE and apr_pollset_wakeup(), which appear
to encapsulate the self-pipe into the pollset itself. Here's hoping it's
implemented for _most_ platforms...?

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Alex Bligh <al...@alex.org.uk>.
On 25 Feb 2015, at 22:40, Jacob Champion <ja...@ni.com> wrote:

> Maybe APR has an abstraction
> for socketpair()?

2 x apr_file_pipe_create ?

-- 
Alex Bligh





Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
Alex,

>> My suspicion is that the two-brigade approach clashes with the fact that
>> OpenSSL can read from the socket during its writes and vice-versa. But
>> that's only a suspicion -- for all I know, mod_ssl and/or Apache might
>> have synchronization techniques that make parallel brigades safe.
> 
> My guess was that it worked without SSL as the brigades have separate
> allocators. However, SSL can write when it's reading and vice versa
> (to implement the SSL protocol), and this caused problems. You want
> as multi-core a machine as possible to demonstrate it (my single core
> VM was not a good plan).

Okay, good to see you had the same hunch.

> I tried libwebsockets originally (years ago) and ran into that issue, or
> something similar. I then tried again, and it's no longer an issue as far
> as I can tell. I'm using the external poll stuff, and getting the flow
> control right was a little hairy. On the specific issue (large writes
> of data towards the client), I think I limit the amount in each ws
> packet (for streaming, that's irrelevant); perhaps that just hides the
> issue.

Makes sense.

> The other thing I was planning to look at was how modproxy-wstunnel
> and friends work - they must face a similar problem.

It looks like mod_proxy_wstunnel creates a pollset on the underlying
sockets. Then it waits for incoming data on both, and writes/flushes
(blocking, I think) any bytes received from one to the other.

It makes me a little nervous that the module is polling on the socket
while reading through the brigade, but I think at this point I'm going
to try to retrofit that logic onto mod_websocket and see where it gets
me. The key advantage is that there's only one thread reading and
writing, which makes me much more comfortable...

Polling has the issue that I need to wake up the brigade-owner-thread
whenever I want to send a message from server to client. On *nix I would
use a self-pipe; on Windows I'm not sure. Maybe APR has an abstraction
for socketpair()?

Jacob Champion
LabVIEW R&D
National Instruments

Re: Debugging mod_websocket -- any others out there?

Posted by Alex Bligh <al...@alex.org.uk>.
Jacob,

> It looks like half your fixes were pulled upstream; the remainder
> involves a new allocator and mutex [2]. Anything else I'm missing?

Not that I recall.

> This is a Windows 64-bit build. I can reproduce this easily, usually
> within seconds of running my tests. I can try to work up a minimal
> reproduction case if anyone else turns up and is interested.

Well, that's an improvement on 20 hours of youtube video over VNC
over websockets ...

> My suspicion is that the two-brigade approach clashes with the fact that
> OpenSSL can read from the socket during its writes and vice-versa. But
> that's only a suspicion -- for all I know, mod_ssl and/or Apache might
> have synchronization techniques that make parallel brigades safe.

My guess was that it worked without SSL as the brigades have separate
allocators. However, SSL can write when it's reading and vice versa
(to implement the SSL protocol), and this caused problems. You want
as multi-core a machine as possible to demonstrate it (my single core
VM was not a good plan).

>> I spent many many hours on this, ultimately unsuccessfully (I've
>> moved to mod_proxy_wstunnel plus a libwebsockets C based thing).
> 
> Tangent: I tried libwebsockets about six months back. I had trouble with
> heavy load with it too -- the architecture didn't seem to handle the
> case where the network stack could only accept a partial write, which
> led to a lot of spurious disconnects when streaming massive amounts of
> data. Have you run into that as well?

I tried libwebsockets originally (years ago) and ran into that issue, or
something similar. I then tried again, and it's no longer an issue as far
as I can tell. I'm using the external poll stuff, and getting the flow
control right was a little hairy. On the specific issue (large writes
of data towards the client), I think I limit the amount in each ws
packet (for streaming, that's irrelevant); perhaps that just hides the
issue.

FWIW I'm now using modproxy-wstunnel in apache to unwrap the SSL,
then an external program to unwrap the websockets and (in essence)
forward as a TCP session. It would be really nice if that could be
done in modproxy-wstunnel (i.e. say 'unwrap the websocket too').

>> FWIW if I hadn't made the above move, my plan was to eliminate
>> doing most of the work in the apache thread by using a bucket
>> brigade thate ended in a socketpair (I can't remember the correct
>> apache terminology here, but the point was to have apache
>> do the read/write from one end of the socketpair), then set
>> up two new threads to read and write from the other end
>> of the socketpair, encoding/decoding as we go. This means that
>> once the connection is live the module code itself wouldn't
>> actually touch any apache memory-managed data.
>> 
>> IE:
>> 
>> Apache <==> Socket+Socket <===> Decode/Encode <===> Whatever
> 
> Interesting. So, if I've got this right, you'd try to artificially
> terminate the chain in a pair of file descriptors, and then handle those
> directly using select/poll/whatever from more threads?

Yes

> Are there any existing modules in Apache that take a similar approach?

Not to my knowledge. Which is why I backed off doing it!

I couldn't even find any real documentation for the thing that
ended a brigade in a pipe/socketpair; I found the code by accident
while hunting the above bug.

The other thing I was planning to look at was how modproxy-wstunnel
and friends work - they must face a similar problem.

-- 
Alex Bligh





Re: Debugging mod_websocket -- any others out there?

Posted by Jacob Champion <ja...@ni.com>.
On 2/25/2015 2:31 AM, Alex Bligh wrote:

> Yes. I've seen this.
> 
> There were once some issues with the brigade allocators. You may want
> to look at my fork here:
> 
>    https://github.com/abligh/apache-websocket

Hi, Alex. Your blog post [1] -- or rather its comments section -- was
actually one of the only other places I could find a reference to the
crash I'm seeing! Glad to see you're watching this list.

It looks like half your fixes were pulled upstream; the remainder
involves a new allocator and mutex [2]. Anything else I'm missing?

> Despite my fixes, it still dies occasionally, normally because one
> of the bucket brigades becomes corrupt.

Like you mention in your post, I'm primarily seeing a crash when using
TLS. For posterity's sake, here's the full stack I see (the commenter on
your post had what appeared to be the same trace, but it was missing
symbols):

    <SEGV from what appears to be the apr_bucket_destroy macro?>
    libhttpd!writev_nonblocking
    libhttpd!send_brigade_nonblocking
    libhttpd!ap_random_parent_after_fork
    libhttpd!ap_pass_brigade
    mod_ssl!bio_filter_out_pass
    mod_ssl!bio_filter_out_write
    <libeay32>!BIO_write
    ...

This is a Windows 64-bit build. I can reproduce this easily, usually
within seconds of running my tests. I can try to work up a minimal
reproduction case if anyone else turns up and is interested.

My suspicion is that the two-brigade approach clashes with the fact that
OpenSSL can read from the socket during its writes and vice-versa. But
that's only a suspicion -- for all I know, mod_ssl and/or Apache might
have synchronization techniques that make parallel brigades safe.

> I spent many many hours on this, ultimately unsuccessfully (I've
> moved to mod_proxy_wstunnel plus a libwebsockets C based thing).

Tangent: I tried libwebsockets about six months back. I had trouble with
heavy load with it too -- the architecture didn't seem to handle the
case where the network stack could only accept a partial write, which
led to a lot of spurious disconnects when streaming massive amounts of
data. Have you run into that as well?

> FWIW if I hadn't made the above move, my plan was to eliminate
> doing most of the work in the apache thread by using a bucket
> brigade thate ended in a socketpair (I can't remember the correct
> apache terminology here, but the point was to have apache
> do the read/write from one end of the socketpair), then set
> up two new threads to read and write from the other end
> of the socketpair, encoding/decoding as we go. This means that
> once the connection is live the module code itself wouldn't
> actually touch any apache memory-managed data.
> 
> IE:
> 
> Apache <==> Socket+Socket <===> Decode/Encode <===> Whatever

Interesting. So, if I've got this right, you'd try to artificially
terminate the chain in a pair of file descriptors, and then handle those
directly using select/poll/whatever from more threads? Are there any
existing modules in Apache that take a similar approach?

Thanks!

Jacob Champion
LabVIEW R&D
National Instruments

[1] http://blog.alex.org.uk/2012/09/11/apache-websockets-and-tcp-vnc-proxy/
[2]
https://github.com/abligh/apache-websocket/commit/2d824f989aac196f42ad5127a290df04720bc2da

Re: Debugging mod_websocket -- any others out there?

Posted by Alex Bligh <al...@alex.org.uk>.
On 25 Feb 2015, at 00:16, Jacob Champion <ja...@ni.com> wrote:

> I've been working with mod_websocket [1] for a few weeks now. Things
> were going very well until we ran into crashes under heavy load. I
> _think_ they are related to the threading model used by mod_websocket --
> it spins up a new thread and a new bucket brigade to write to the
> connection, while the original thread and brigade block to read
> messages. Unfortunately I don't know enough about the brigade system to
> know whether that's kosher or even fixable.

Yes. I've seen this.

There were once some issues with the brigade allocators. You may want
to look at my fork here:

   https://github.com/abligh/apache-websocket

In there you'll find 'vncproxy' which is actually a generic tcp proxy.

Despite my fixes, it still dies occasionally, normally because one
of the bucket brigades becomes corrupt. I spent many many hours on
this, ultimately unsuccessfully (I've moved to mod_proxy_wstunnel
plus a libwebsockets C based thing).

FWIW if I hadn't made the above move, my plan was to eliminate
doing most of the work in the apache thread by using a bucket
brigade thate ended in a socketpair (I can't remember the correct
apache terminology here, but the point was to have apache
do the read/write from one end of the socketpair), then set
up two new threads to read and write from the other end
of the socketpair, encoding/decoding as we go. This means that
once the connection is live the module code itself wouldn't
actually touch any apache memory-managed data.

IE:

Apache <==> Socket+Socket <===> Decode/Encode <===> Whatever

I'm sure that's not particularly efficient, but it would
work.

I was idly thinking about swapping the encode/decode function
for libwebsockets, but the "LGPL2+ static link exception"
bit may create an issue. TBH I never had any issues with
the actual encode/decode code.

-- 
Alex Bligh