You are viewing a plain text version of this content. The canonical link for it is here.
Posted to modules-dev@httpd.apache.org by Jacob Champion <ja...@ni.com> on 2015/03/09 19:43:51 UTC

[PATCH 0/5] Fix mod_websocket segfaults under load

The first patch changes to nonblocking reads and the use of
apr_pollset_poll(). The next two patches are meant to be mostly
refactoring and code motion, in order to make the fourth patch (which
contains the actual architecture changes) more straightforward. The
fifth patch is cleanup of now unnecessary code.

Comments and suggestions are welcome!

The code in this patchset is Copyright (c) National Instruments and
Apache Software Foundation, licensed to the public under the Apache
License 2.0.

Jacob Champion (5):
  Change reads to nonblocking/poll
  Separate plugin_send from the send implementation
  Pull framing logic into a separate function
  Read and write to the brigade from only one thread
  Remove extra pool and clean up brigades

 mod_websocket.c | 1005 +++++++++++++++++++++++++++++++++----------------------
 1 file changed, 607 insertions(+), 398 deletions(-)

-- 
2.1.1


Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 11 Sep 2015, at 23:49, Jacob Champion <ch...@gmail.com> wrote:

> Hi Alex,
> 
> On 09/11/2015 10:02 AM, Alex Bligh wrote:
>> mod_proxy_wstunnel forwards the websocket connection without
>> interpreting the protocol (i.e. needs to be directed at a websocket
>> server); my module (which just plugs into mod_websocket) forwards it
>> as a TCP port. EG for VNC over Websockets you'd just need to point my
>> module at port 5900, whereas with mod_websocket you'd need something
>> further to decode it.
> 
> Oh, neat! I assume there are specific protocol designs that can't be proxied over WebSocket? (For example, anything that requires the application-level use of a TCP half-close would presumably be iffy?) What are the sorts of protocols that work best with this setup?

There are some that cannot be proxied directly, but not many due to the prevalence of UNIX 'everything is a file' paradigm.

One such example is the guacamole protocol (a bit like VNC) which requires that a guacamole frame travelling toward the client is not split across websockets frames. You will find in the example I sent a processor for that which interprets client-bound data and splits them up into specific frames rather than treating them as a stream.

I've not come across any issues with half close, but I suppose that might be an issue.

> At this point I want to avoid biting off more than I can chew -- I want to get the current code in order before absorbing more functionality  -- but this is a really neat idea. I'll need to think about security corner cases with a generic TCP proxy setup too... more research needed.

Sure - I was more thinking to add it as an example. As I say, it's a *user* of your module (well, disconnect's module), not a code change to it. There's probably piles of stuff you can strip out.

Oh, and allegedly it doesn't compile on Windows (I wouldn't be able to test that).

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
Hi Alex,

On 09/11/2015 10:02 AM, Alex Bligh wrote:
> mod_proxy_wstunnel forwards the websocket connection without
> interpreting the protocol (i.e. needs to be directed at a websocket
> server); my module (which just plugs into mod_websocket) forwards it
> as a TCP port. EG for VNC over Websockets you'd just need to point my
> module at port 5900, whereas with mod_websocket you'd need something
> further to decode it.

Oh, neat! I assume there are specific protocol designs that can't be 
proxied over WebSocket? (For example, anything that requires the 
application-level use of a TCP half-close would presumably be iffy?) 
What are the sorts of protocols that work best with this setup?

At this point I want to avoid biting off more than I can chew -- I want 
to get the current code in order before absorbing more functionality  -- 
but this is a really neat idea. I'll need to think about security corner 
cases with a generic TCP proxy setup too... more research needed.

>> Hopefully that makes sense -- any specific parts that still need
>> clarification?
>
> Thanks. Yes that makes sense I think. I sort of more meant that it
> would be useful in a README or something in the module itself, as I
> had difficulty grasping it when I was writing the vncproxy stuff.

Gotcha. My latest commit adds a few explanatory comments to the two 
areas I mentioned; thanks for the feedback!

--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 10 Sep 2015, at 23:26, Jacob Champion <ch...@gmail.com> wrote:

> On 09/10/2015 02:50 PM, Alex Bligh wrote:
>> Here: https://github.com/abligh/apache-websocket
>> 
>> in the vncproxy directory you will find a vncproxy (which is actually
>> a generic tcpproxy as well, though it could be hugely simplified to
>> do that alone) which you are welcome to have if you've not changed
>> the API to the websocket modules.
> 
> I'll take a look, thanks. Do you know how your implementation relates to mod_proxy_wstunnel?

mod_proxy_wstunnel forwards the websocket connection without interpreting the protocol (i.e. needs to be directed at a websocket server); my module (which just plugs into mod_websocket) forwards it as a TCP port. EG for VNC over Websockets you'd just need to point my module at port 5900, whereas with mod_websocket you'd need something further to decode it.

(FWIW I switched from using mod_websocket + my code to mod_proxy_wstunnel and some C with libwebsockets at least in part because I couldn't fix what you seem to have just fixed)

>> It would be really helpful if you could document how the threading /
>> FD processing / bucket brigade runs now. Crucially you've put all the
>> bucket brigade handling into a single thread (I think) which I take
>> it is the same thread as apache itself uses - that's what I reckoned
>> I needed to do to get it to work reliably with SSL.
> 
> Right, the single thread is the key. The Git commit messages might help with the understanding somewhat, since that's where I put the patch motivations (and it's what I'm reading to remember what I did, heh).
> 
> The main loop in mod_websocket_data_framing() is where the real work is done. This is called on the original request thread from Apache. The loop alternately checks for incoming data (with a nonblocking read) and outgoing data (from any messages on the outgoing queue). Reading and writing to socket is never done by anything but the framing loop; mod_websocket_plugin_send() queues any cross-thread messages for later processing by the loop. This is similar to a UI event dispatcher, if you've ever worked with a framework that used one.
> 
> The only "tricky" part in the architecture is that we don't want to busy-wait if we have nothing to do, so there's an apr_pollset_t that is waited on when we run out of work.
> 
> Hopefully that makes sense -- any specific parts that still need clarification?

Thanks. Yes that makes sense I think. I sort of more meant that it would be useful in a README or something in the module itself, as I had difficulty grasping it when I was writing the vncproxy stuff.

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
On 09/10/2015 02:50 PM, Alex Bligh wrote:
> Here: https://github.com/abligh/apache-websocket
>
> in the vncproxy directory you will find a vncproxy (which is actually
> a generic tcpproxy as well, though it could be hugely simplified to
> do that alone) which you are welcome to have if you've not changed
> the API to the websocket modules.

I'll take a look, thanks. Do you know how your implementation relates to 
mod_proxy_wstunnel?

> It would be really helpful if you could document how the threading /
> FD processing / bucket brigade runs now. Crucially you've put all the
> bucket brigade handling into a single thread (I think) which I take
> it is the same thread as apache itself uses - that's what I reckoned
> I needed to do to get it to work reliably with SSL.

Right, the single thread is the key. The Git commit messages might help 
with the understanding somewhat, since that's where I put the patch 
motivations (and it's what I'm reading to remember what I did, heh).

The main loop in mod_websocket_data_framing() is where the real work is 
done. This is called on the original request thread from Apache. The 
loop alternately checks for incoming data (with a nonblocking read) and 
outgoing data (from any messages on the outgoing queue). Reading and 
writing to socket is never done by anything but the framing loop; 
mod_websocket_plugin_send() queues any cross-thread messages for later 
processing by the loop. This is similar to a UI event dispatcher, if 
you've ever worked with a framework that used one.

The only "tricky" part in the architecture is that we don't want to 
busy-wait if we have nothing to do, so there's an apr_pollset_t that is 
waited on when we run out of work.

Hopefully that makes sense -- any specific parts that still need 
clarification?

--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Alex Bligh <al...@alex.org.uk>.
On 10 Sep 2015, at 22:05, Jacob Champion <ch...@gmail.com> wrote:

> I've submitted a pull request to the original author, but I'm assuming he will not suddenly appear out of the blue after three years, so I plan to actively own and maintain my fork on GitHub. If you're also interested in WebSockets with Apache, and/or you've submitted a pull request to the original project and it hasn't gone anywhere, please give me a buzz.

Interested, but have very limited time.

Thanks for this.

Here:
  https://github.com/abligh/apache-websocket

in the vncproxy directory you will find a vncproxy (which is actually a generic tcpproxy as well, though it could be hugely simplified to do that alone) which you are welcome to have if you've not changed the API to the websocket modules.

It would be really helpful if you could document how the threading / FD processing / bucket brigade runs now. Crucially you've put all the bucket brigade handling into a single thread (I think) which I take it is the same thread as apache itself uses - that's what I reckoned I needed to do to get it to work
reliably with SSL.

-- 
Alex Bligh





Re: mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
On 09/11/2015 02:52 AM, Stefan Eissing wrote:
> Hi Jacob,
>
> good to see that someone adopts this! As to advise...
>
> While I am no expert on WebSockets and its different flavors, I know
> that mod_websocket and my cuddly mod_h2 have several things in
> common. For example they both have to manage the "Upgrade:" dance.
> And WebSocket might want to participate in the TLS+ALPN protocol
> handling also.

Hello Stefan! I saw the Upgrade/ALPN discussion a little while ago on 
the dev list and thought of this as well.

> For these things, there are some new features in trunk and hopefully
> in the next 2.4.x release. In case you're interested, we can chat
> about this a bit.

I am absolutely interested. I'm not quite sure where to begin though... 
perhaps migrating the module to use an "official" Upgrade 
implementation? Did you have some entry points or pieces of code in 
mod_h2 or trunk that you'd like me to take a look at?

> As to other topics, such as moving buckets across threads or
> not-busy-waiting: these areas mod_websocket and mod_h2 have in common
> to find a good solution for. Might also be worth an exchange.

If you haven't already, take a look at the initial discussion on 
modules-dev that prompted my patchset -- in particular, the problem of 
how to implement bidirectional asynchronous communication using only one 
thread. The current solution seems to work, but it's less than ideal... 
personally, I'd like it very much if an idle WebSocket connection didn't 
monopolize one of Apache's precious threads. Perhaps mod_h2 has similar 
problems?

I will start taking a look at mod_h2 and familiarizing myself with it; 
I've been meaning to do that for a while now anyway. If you had any 
immediate discussion topics, please by all means bring them up. :) In 
the meantime, my initial focus for this module (besides getting up to 
speed on the codebase itself) will be on RFC compliance...

Thanks!
--Jacob

Re: mod_websocket ownership (and fixes)

Posted by Stefan Eissing <st...@eissing.org>.
Hi Jacob,

good to see that someone adopts this! As to advise...

While I am no expert on WebSockets and its different flavors, I know that mod_websocket and my cuddly mod_h2 have several things in common. For example they both have to manage the "Upgrade:" dance. And WebSocket might want to participate in the TLS+ALPN protocol handling also.

For these things, there are some new features in trunk and hopefully in the next 2.4.x release. In case you're interested, we can chat about this a bit.

As to other topics, such as moving buckets across threads or not-busy-waiting: these areas mod_websocket and mod_h2 have in common to find a good solution for. Might also be worth an exchange.

I was (for no particular reason) not subscribed to modules-dev, but am now. So you'll also find me there.

cheers,

  Stefan


> Am 10.09.2015 um 23:05 schrieb Jacob Champion <ch...@gmail.com>:
> 
> To hopefully put this thing to bed, six months later -- I have taken the mod_websocket patchset started by the long conversation in [1] and pushed it to a GitHub fork [2].
> 
> I've submitted a pull request to the original author, but I'm assuming he will not suddenly appear out of the blue after three years, so I plan to actively own and maintain my fork on GitHub. If you're also interested in WebSockets with Apache, and/or you've submitted a pull request to the original project and it hasn't gone anywhere, please give me a buzz.
> 
> I still need to familiarize myself with the code base and get it into a state where I can sustainably maintain it (tests would be good). After that I'm primarily interested in picking up the conversation left off in Bug 47485 [3]. Any general advice is welcome!
> 
> [Administrative side note: I am no longer employed at National Instruments, so emails to my prior @ni.com address will bounce. Use GitHub to talk to me about the module.]
> 
> Thanks again to modules-dev for helping and humoring me!
> 
> --Jacob
> 
> [1] http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54ED1473.1060604%40ni.com%3E
> [2] https://github.com/jchampio/apache-websocket/tree/dev/parallel-tls-fix
> [3] https://bz.apache.org/bugzilla/show_bug.cgi?id=47485


mod_websocket ownership (and fixes)

Posted by Jacob Champion <ch...@gmail.com>.
To hopefully put this thing to bed, six months later -- I have taken the 
mod_websocket patchset started by the long conversation in [1] and 
pushed it to a GitHub fork [2].

I've submitted a pull request to the original author, but I'm assuming 
he will not suddenly appear out of the blue after three years, so I plan 
to actively own and maintain my fork on GitHub. If you're also 
interested in WebSockets with Apache, and/or you've submitted a pull 
request to the original project and it hasn't gone anywhere, please give 
me a buzz.

I still need to familiarize myself with the code base and get it into a 
state where I can sustainably maintain it (tests would be good). After 
that I'm primarily interested in picking up the conversation left off in 
Bug 47485 [3]. Any general advice is welcome!

[Administrative side note: I am no longer employed at National 
Instruments, so emails to my prior @ni.com address will bounce. Use 
GitHub to talk to me about the module.]

Thanks again to modules-dev for helping and humoring me!

--Jacob

[1] 
http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54ED1473.1060604%40ni.com%3E
[2] https://github.com/jchampio/apache-websocket/tree/dev/parallel-tls-fix
[3] https://bz.apache.org/bugzilla/show_bug.cgi?id=47485

Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Jacob Champion <ja...@ni.com>.
Alex,

On 3/9/2015 2:11 PM, Alex Bligh wrote:

> Note mod_websocket is not (currently) in the list of apache supported
> modules, though for one would love it to be. So for most people they
> can't see what you are patching against.

I didn't even think about that -- for future readers of this list, this
patchset is based on commit cfaef071 of

    https://github.com/disconnect/apache-websocket/

> Do you have this in git somewhere (e.g. github?)

I don't currently. I used to have a github account for my company; if it
makes things much easier for you, I can look into having it reactivated.

Thanks again for your input on this fix!

Jacob Champion
LabVIEW R&D
National Instruments

Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Alex Bligh <al...@alex.org.uk>.
Jacob,

On 9 Mar 2015, at 18:43, Jacob Champion <ja...@ni.com> wrote:

> The first patch changes to nonblocking reads and the use of
> apr_pollset_poll(). The next two patches are meant to be mostly
> refactoring and code motion, in order to make the fourth patch (which
> contains the actual architecture changes) more straightforward. The
> fifth patch is cleanup of now unnecessary code.
> 
> Comments and suggestions are welcome!
> 
> The code in this patchset is Copyright (c) National Instruments and
> Apache Software Foundation, licensed to the public under the Apache
> License 2.0.

Thanks for these. I'll try to find some time to look through them.
Note mod_websocket is not (currently) in the list of apache supported
modules, though for one would love it to be. So for most people they
can't see what you are patching against.

Do you have this in git somewhere (e.g. github?)

-- 
Alex Bligh





Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Alex Bligh <al...@alex.org.uk>.
On 30 Apr 2015, at 19:19, Jacob Champion <ja...@ni.com> wrote:

> Quick bump to this thread. Have any interested parties (Alex?) been able
> to give these patches a try?

Not yet - sorry.

-- 
Alex Bligh





Re: [PATCH 0/5] Fix mod_websocket segfaults under load

Posted by Jacob Champion <ja...@ni.com>.
On 3/9/2015 1:43 PM, Jacob Champion wrote:
>  mod_websocket.c | 1005 +++++++++++++++++++++++++++++++++----------------------
>  1 file changed, 607 insertions(+), 398 deletions(-)

Quick bump to this thread. Have any interested parties (Alex?) been able
to give these patches a try? Any comments on coding style, architecture,
etc.?

And now for a really ambitious question: with enough cleanup and TLC,
could this module ever be considered for support as an official module?
If so, what would we need to put into it to get it there? If not, what
are the objections?

I'm particularly curious about thoughts on the threading architecture
(one thread monopolized per WebSocket), the authorship concern (IIUC,
ASF requires some trademark and/or other IP transfer, which may not be
possible if the author is gone), and just the general question of
"should Apache support WebSockets out of the box?" Does the answer to
the last question change depending on the version of Apache (2.2, 2.4, 2.6)?

Thanks!

Jacob Champion
LabVIEW R&D
National Instruments

[PATCH 1/5] Change reads to nonblocking/poll

Posted by Jacob Champion <ja...@ni.com>.
In preparation for an architecture where reads and writes to the brigade
happen on the same thread, perform nonblocking reads and poll on the
socket to determine when more data is available.
---
 mod_websocket.c | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index b5e73be..5a04936 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -381,26 +381,22 @@ static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
 /*
  * Read a buffer of data from the input stream.
  */
-static apr_size_t mod_websocket_read_block(request_rec *r, char *buffer,
-                                           apr_size_t bufsiz)
+static apr_status_t mod_websocket_read_nonblock(request_rec *r, char *buffer,
+                                                apr_size_t *bufsiz)
 {
-    apr_status_t rv;
+    apr_status_t rv = APR_ENOMEM;
     apr_bucket_brigade *bb;
-    apr_size_t readbufsiz = 0;
 
     bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
     if (bb != NULL) {
         if ((rv =
              ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
-                            APR_BLOCK_READ, bufsiz)) == APR_SUCCESS) {
-            if ((rv =
-                 apr_brigade_flatten(bb, buffer, &bufsiz)) == APR_SUCCESS) {
-                readbufsiz = bufsiz;
-            }
+                            APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
+            rv = apr_brigade_flatten(bb, buffer, bufsiz);
         }
         apr_brigade_destroy(bb);
     }
-    return readbufsiz;
+    return rv;
 }
 
 /*
@@ -475,6 +471,10 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     apr_pool_t *pool = NULL;
     apr_bucket_alloc_t *bucket_alloc;
     apr_bucket_brigade *obb;
+    apr_pollset_t *pollset;
+    apr_pollfd_t pollfd = { 0 };
+    const apr_pollfd_t *signalled;
+    apr_int32_t pollcnt;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection (r->connection->bucket_alloc)
@@ -487,7 +487,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
-        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL)) {
+        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
+        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         apr_int64_t extension_bytes_remaining = 0;
@@ -504,15 +505,30 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         unsigned short status_code = STATUS_CODE_OK;
         unsigned char status_code_buffer[2];
 
+        /* Initialize the pollset */
+        pollfd.p = pool;
+        pollfd.desc_type = APR_POLL_SOCKET;
+        pollfd.reqevents = APR_POLLIN;
+        pollfd.desc.s = ap_get_conn_socket(state->r->connection);
+        apr_pollset_add(pollset, &pollfd);
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
-        while ((framing_state != DATA_FRAMING_CLOSE) &&
-               ((block_size =
-                 mod_websocket_read_block(r, (char *)block,
-                                          sizeof(block))) > 0)) {
+        while ((framing_state != DATA_FRAMING_CLOSE)) {
             apr_int64_t block_offset = 0;
+            apr_status_t rv;
+
+            do {
+                block_size = sizeof(block);
+                rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+            } while (APR_STATUS_IS_EAGAIN(rv) &&
+                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == APR_SUCCESS);
+
+            if (rv != APR_SUCCESS) {
+                break;
+            }
 
             while (block_offset < block_size) {
                 switch (framing_state) {
-- 
2.1.1


[PATCH 3/5] Pull framing logic into a separate function

Posted by Jacob Champion <ja...@ni.com>.
mod_websocket_handle_incoming now performs the core logic for
mod_websocket_data_framing. This will let us turn the processing loop
into a combination read/write loop more easily.

The state variables have all been moved into a separate
WebSocketReadState struct.
---
 mod_websocket.c | 652 +++++++++++++++++++++++++++++---------------------------
 1 file changed, 342 insertions(+), 310 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 62be765..1e88ef6 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -476,6 +476,327 @@ typedef struct _WebSocketFrameData
     unsigned int utf8_state;
 } WebSocketFrameData;
 
+/* Variables that need to persist across calls to mod_websocket_handle_incoming */
+typedef struct
+{
+    int framing_state;
+    unsigned short status_code;
+    /* XXX fin and opcode appear to be duplicated with frame; can they be removed? */
+    unsigned char fin;
+    unsigned char opcode;
+    WebSocketFrameData control_frame;
+    WebSocketFrameData message_frame;
+    WebSocketFrameData *frame;
+    apr_int64_t payload_length;
+    apr_int64_t mask_offset;
+    apr_int64_t extension_bytes_remaining;
+    int payload_length_bytes_remaining;
+    int masking;
+    int mask_index;
+    unsigned char mask[4];
+} WebSocketReadState;
+
+static void mod_websocket_handle_incoming(const WebSocketServer *server,
+                                          unsigned char *block,
+                                          apr_int64_t block_size,
+                                          WebSocketReadState *state,
+                                          websocket_config_rec *conf,
+                                          void *plugin_private)
+{
+    apr_int64_t block_offset = 0;
+
+    while (block_offset < block_size) {
+        switch (state->framing_state) {
+        case DATA_FRAMING_START:
+            /*
+             * Since we don't currently support any extensions,
+             * the reserve bits must be 0
+             */
+            if ((FRAME_GET_RSV1(block[block_offset]) != 0) ||
+                (FRAME_GET_RSV2(block[block_offset]) != 0) ||
+                (FRAME_GET_RSV3(block[block_offset]) != 0)) {
+                state->framing_state = DATA_FRAMING_CLOSE;
+                state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                break;
+            }
+            state->fin = FRAME_GET_FIN(block[block_offset]);
+            state->opcode = FRAME_GET_OPCODE(block[block_offset++]);
+
+            state->framing_state = DATA_FRAMING_PAYLOAD_LENGTH;
+
+            if (state->opcode >= 0x8) { /* Control frame */
+                if (state->fin) {
+                    state->frame = &state->control_frame;
+                    state->frame->opcode = state->opcode;
+                    state->frame->utf8_state = UTF8_VALID;
+                }
+                else {
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                    break;
+                }
+            }
+            else { /* Message frame */
+                state->frame = &state->message_frame;
+                if (state->opcode) {
+                    if (state->frame->fin) {
+                        state->frame->opcode = state->opcode;
+                        state->frame->utf8_state = UTF8_VALID;
+                    }
+                    else {
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                        break;
+                    }
+                }
+                else if (state->frame->fin ||
+                         ((state->opcode = state->frame->opcode) == 0)) {
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                    break;
+                }
+                state->frame->fin = state->fin;
+            }
+            state->payload_length = 0;
+            state->payload_length_bytes_remaining = 0;
+
+            if (block_offset >= block_size) {
+                break; /* Only break if we need more data */
+            }
+        case DATA_FRAMING_PAYLOAD_LENGTH:
+            state->payload_length = (apr_int64_t)
+                FRAME_GET_PAYLOAD_LEN(block[block_offset]);
+            state->masking = FRAME_GET_MASK(block[block_offset++]);
+
+            if (state->payload_length == 126) {
+                state->payload_length = 0;
+                state->payload_length_bytes_remaining = 2;
+            }
+            else if (state->payload_length == 127) {
+                state->payload_length = 0;
+                state->payload_length_bytes_remaining = 8;
+            }
+            else {
+                state->payload_length_bytes_remaining = 0;
+            }
+            if ((state->masking == 0) ||   /* Client-side mask is required */
+                ((state->opcode >= 0x8) && /* Control opcodes cannot have a payload larger than 125 bytes */
+                 (state->payload_length_bytes_remaining != 0))) {
+                state->framing_state = DATA_FRAMING_CLOSE;
+                state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                break;
+            }
+            else {
+                state->framing_state = DATA_FRAMING_PAYLOAD_LENGTH_EXT;
+            }
+            if (block_offset >= block_size) {
+                break;  /* Only break if we need more data */
+            }
+        case DATA_FRAMING_PAYLOAD_LENGTH_EXT:
+            while ((state->payload_length_bytes_remaining > 0) &&
+                   (block_offset < block_size)) {
+                state->payload_length *= 256;
+                state->payload_length += block[block_offset++];
+                state->payload_length_bytes_remaining--;
+            }
+            if (state->payload_length_bytes_remaining == 0) {
+                if ((state->payload_length < 0) ||
+                    (state->payload_length > conf->payload_limit)) {
+                    /* Invalid payload length */
+                    state->framing_state = DATA_FRAMING_CLOSE;
+                    state->status_code = (server->state->protocol_version >= 13) ?
+                                          STATUS_CODE_MESSAGE_TOO_LARGE :
+                                          STATUS_CODE_RESERVED;
+                    break;
+                }
+                else if (state->masking != 0) {
+                    state->framing_state = DATA_FRAMING_MASK;
+                }
+                else {
+                    state->framing_state = DATA_FRAMING_EXTENSION_DATA;
+                    break;
+                }
+            }
+            if (block_offset >= block_size) {
+                break;  /* Only break if we need more data */
+            }
+        case DATA_FRAMING_MASK:
+            while ((state->mask_index < 4) && (block_offset < block_size)) {
+                state->mask[state->mask_index++] = block[block_offset++];
+            }
+            if (state->mask_index == 4) {
+                state->framing_state = DATA_FRAMING_EXTENSION_DATA;
+                state->mask_offset = 0;
+                state->mask_index = 0;
+                if ((state->mask[0] == 0) && (state->mask[1] == 0) &&
+                    (state->mask[2] == 0) && (state->mask[3] == 0)) {
+                    state->masking = 0;
+                }
+            }
+            else {
+                break;
+            }
+            /* Fall through */
+        case DATA_FRAMING_EXTENSION_DATA:
+            /* Deal with extension data when we support them -- FIXME */
+            if (state->extension_bytes_remaining == 0) {
+                if (state->payload_length > 0) {
+                    state->frame->application_data = (unsigned char *)
+                        realloc(state->frame->application_data,
+                                state->frame->application_data_offset +
+                                state->payload_length);
+                    if (state->frame->application_data == NULL) {
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = (server->state->protocol_version >= 13) ?
+                                              STATUS_CODE_INTERNAL_ERROR :
+                                              STATUS_CODE_GOING_AWAY;
+                        break;
+                    }
+                }
+                state->framing_state = DATA_FRAMING_APPLICATION_DATA;
+            }
+            /* Fall through */
+        case DATA_FRAMING_APPLICATION_DATA:
+            {
+                apr_int64_t block_data_length;
+                apr_int64_t block_length = 0;
+                apr_uint64_t application_data_offset =
+                    state->frame->application_data_offset;
+                unsigned char *application_data =
+                    state->frame->application_data;
+
+                block_length = block_size - block_offset;
+                block_data_length =
+                    (state->payload_length >
+                     block_length) ? block_length : state->payload_length;
+
+                if (state->masking) {
+                    apr_int64_t i;
+
+                    if (state->opcode == OPCODE_TEXT) {
+                        unsigned int utf8_state = state->frame->utf8_state;
+                        unsigned char c;
+
+                        for (i = 0; i < block_data_length; i++) {
+                            c = block[block_offset++] ^
+                                state->mask[state->mask_offset++ & 3];
+                            utf8_state =
+                                validate_utf8[utf8_state + c];
+                            if (utf8_state == UTF8_INVALID) {
+                                state->payload_length = block_data_length;
+                                break;
+                            }
+                            application_data
+                                [application_data_offset++] = c;
+                        }
+                        state->frame->utf8_state = utf8_state;
+                    }
+                    else {
+                        /* Need to optimize the unmasking -- FIXME */
+                        for (i = 0; i < block_data_length; i++) {
+                            application_data
+                                [application_data_offset++] =
+                                block[block_offset++] ^
+                                state->mask[state->mask_offset++ & 3];
+                        }
+                    }
+                }
+                else if (block_data_length > 0) {
+                    memcpy(&application_data[application_data_offset],
+                           &block[block_offset], block_data_length);
+                    if (state->opcode == OPCODE_TEXT) {
+                        apr_int64_t i, application_data_end =
+                            application_data_offset +
+                            block_data_length;
+                        unsigned int utf8_state = state->frame->utf8_state;
+
+                        for (i = application_data_offset;
+                             i < application_data_end; i++) {
+                            utf8_state =
+                                validate_utf8[utf8_state +
+                                              application_data[i]];
+                            if (utf8_state == UTF8_INVALID) {
+                                state->payload_length = block_data_length;
+                                break;
+                            }
+                        }
+                        state->frame->utf8_state = utf8_state;
+                    }
+                    application_data_offset += block_data_length;
+                    block_offset += block_data_length;
+                }
+                state->payload_length -= block_data_length;
+
+                if (state->payload_length == 0) {
+                    int message_type = MESSAGE_TYPE_INVALID;
+
+                    switch (state->opcode) {
+                    case OPCODE_TEXT:
+                        if ((state->fin &&
+                            (state->frame->utf8_state != UTF8_VALID)) ||
+                            (state->frame->utf8_state == UTF8_INVALID)) {
+                            state->framing_state = DATA_FRAMING_CLOSE;
+                            state->status_code = STATUS_CODE_INVALID_UTF8;
+                        }
+                        else {
+                            message_type = MESSAGE_TYPE_TEXT;
+                        }
+                        break;
+                    case OPCODE_BINARY:
+                        message_type = MESSAGE_TYPE_BINARY;
+                        break;
+                    case OPCODE_CLOSE:
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_OK;
+                        break;
+                    case OPCODE_PING:
+                        apr_thread_mutex_lock(server->state->mutex);
+                        mod_websocket_send_internal(server->state,
+                                                    MESSAGE_TYPE_PONG,
+                                                    application_data,
+                                                    application_data_offset);
+                        apr_thread_mutex_unlock(server->state->mutex);
+                        break;
+                    case OPCODE_PONG:
+                        break;
+                    default:
+                        state->framing_state = DATA_FRAMING_CLOSE;
+                        state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+                        break;
+                    }
+                    if (state->fin && (message_type != MESSAGE_TYPE_INVALID)) {
+                        conf->plugin->on_message(plugin_private,
+                                                 server, message_type,
+                                                 application_data,
+                                                 application_data_offset);
+                    }
+                    if (state->framing_state != DATA_FRAMING_CLOSE) {
+                        state->framing_state = DATA_FRAMING_START;
+
+                        if (state->fin) {
+                            if (state->frame->application_data != NULL) {
+                                free(state->frame->application_data);
+                                state->frame->application_data = NULL;
+                            }
+                            application_data_offset = 0;
+                        }
+                    }
+                }
+                state->frame->application_data_offset =
+                    application_data_offset;
+            }
+            break;
+        case DATA_FRAMING_CLOSE:
+            block_offset = block_size;
+            break;
+        default:
+            state->framing_state = DATA_FRAMING_CLOSE;
+            state->status_code = STATUS_CODE_PROTOCOL_ERROR;
+            break;
+        }
+    }
+}
+
 /*
  * The data framing handler requires that the server state mutex is locked by
  * the caller upon entering this function. It will be locked when leaving too.
@@ -509,19 +830,19 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
-        apr_int64_t extension_bytes_remaining = 0;
-        apr_int64_t payload_length = 0;
-        apr_int64_t mask_offset = 0;
-        int framing_state = DATA_FRAMING_START;
-        int payload_length_bytes_remaining = 0;
-        int mask_index = 0, masking = 0;
-        unsigned char mask[4] = { 0, 0, 0, 0 };
-        unsigned char fin = 0, opcode = 0xFF;
-        WebSocketFrameData control_frame = { 0, NULL, 1, 8, UTF8_VALID };
-        WebSocketFrameData message_frame = { 0, NULL, 1, 0, UTF8_VALID };
-        WebSocketFrameData *frame = &control_frame;
-        unsigned short status_code = STATUS_CODE_OK;
         unsigned char status_code_buffer[2];
+        WebSocketReadState read_state = { 0 };
+
+        read_state.framing_state = DATA_FRAMING_START;
+        read_state.status_code = STATUS_CODE_OK;
+        read_state.control_frame.fin = 1;
+        read_state.control_frame.opcode = 8;
+        read_state.control_frame.utf8_state = UTF8_VALID;
+        read_state.message_frame.fin = 1;
+        read_state.message_frame.opcode = 0;
+        read_state.message_frame.utf8_state = UTF8_VALID;
+        read_state.frame = &read_state.control_frame;
+        read_state.opcode = 0xFF;
 
         /* Initialize the pollset */
         pollfd.p = pool;
@@ -534,8 +855,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
-        while ((framing_state != DATA_FRAMING_CLOSE)) {
-            apr_int64_t block_offset = 0;
+        while ((read_state.framing_state != DATA_FRAMING_CLOSE)) {
             apr_status_t rv;
 
             do {
@@ -548,307 +868,19 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                 break;
             }
 
-            while (block_offset < block_size) {
-                switch (framing_state) {
-                case DATA_FRAMING_START:
-                    /*
-                     * Since we don't currently support any extensions,
-                     * the reserve bits must be 0
-                     */
-                    if ((FRAME_GET_RSV1(block[block_offset]) != 0) ||
-                        (FRAME_GET_RSV2(block[block_offset]) != 0) ||
-                        (FRAME_GET_RSV3(block[block_offset]) != 0)) {
-                        framing_state = DATA_FRAMING_CLOSE;
-                        status_code = STATUS_CODE_PROTOCOL_ERROR;
-                        break;
-                    }
-                    fin = FRAME_GET_FIN(block[block_offset]);
-                    opcode = FRAME_GET_OPCODE(block[block_offset++]);
-
-                    framing_state = DATA_FRAMING_PAYLOAD_LENGTH;
-
-                    if (opcode >= 0x8) { /* Control frame */
-                        if (fin) {
-                            frame = &control_frame;
-                            frame->opcode = opcode;
-                            frame->utf8_state = UTF8_VALID;
-                        }
-                        else {
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = STATUS_CODE_PROTOCOL_ERROR;
-                            break;
-                        }
-                    }
-                    else { /* Message frame */
-                        frame = &message_frame;
-                        if (opcode) {
-                            if (frame->fin) {
-                                frame->opcode = opcode;
-                                frame->utf8_state = UTF8_VALID;
-                            }
-                            else {
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_PROTOCOL_ERROR;
-                                break;
-                            }
-                        }
-                        else if (frame->fin ||
-                                 ((opcode = frame->opcode) == 0)) {
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = STATUS_CODE_PROTOCOL_ERROR;
-                            break;
-                        }
-                        frame->fin = fin;
-                    }
-                    payload_length = 0;
-                    payload_length_bytes_remaining = 0;
-
-                    if (block_offset >= block_size) {
-                        break; /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_PAYLOAD_LENGTH:
-                    payload_length = (apr_int64_t)
-                        FRAME_GET_PAYLOAD_LEN(block[block_offset]);
-                    masking = FRAME_GET_MASK(block[block_offset++]);
-
-                    if (payload_length == 126) {
-                        payload_length = 0;
-                        payload_length_bytes_remaining = 2;
-                    }
-                    else if (payload_length == 127) {
-                        payload_length = 0;
-                        payload_length_bytes_remaining = 8;
-                    }
-                    else {
-                        payload_length_bytes_remaining = 0;
-                    }
-                    if ((masking == 0) ||   /* Client-side mask is required */
-                        ((opcode >= 0x8) && /* Control opcodes cannot have a payload larger than 125 bytes */
-                         (payload_length_bytes_remaining != 0))) {
-                        framing_state = DATA_FRAMING_CLOSE;
-                        status_code = STATUS_CODE_PROTOCOL_ERROR;
-                        break;
-                    }
-                    else {
-                        framing_state = DATA_FRAMING_PAYLOAD_LENGTH_EXT;
-                    }
-                    if (block_offset >= block_size) {
-                        break;  /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_PAYLOAD_LENGTH_EXT:
-                    while ((payload_length_bytes_remaining > 0) &&
-                           (block_offset < block_size)) {
-                        payload_length *= 256;
-                        payload_length += block[block_offset++];
-                        payload_length_bytes_remaining--;
-                    }
-                    if (payload_length_bytes_remaining == 0) {
-                        if ((payload_length < 0) ||
-                            (payload_length > conf->payload_limit)) {
-                            /* Invalid payload length */
-                            framing_state = DATA_FRAMING_CLOSE;
-                            status_code = (state->protocol_version >= 13) ?
-                                           STATUS_CODE_MESSAGE_TOO_LARGE :
-                                           STATUS_CODE_RESERVED;
-                            break;
-                        }
-                        else if (masking != 0) {
-                            framing_state = DATA_FRAMING_MASK;
-                        }
-                        else {
-                            framing_state = DATA_FRAMING_EXTENSION_DATA;
-                            break;
-                        }
-                    }
-                    if (block_offset >= block_size) {
-                        break;  /* Only break if we need more data */
-                    }
-                case DATA_FRAMING_MASK:
-                    while ((mask_index < 4) && (block_offset < block_size)) {
-                        mask[mask_index++] = block[block_offset++];
-                    }
-                    if (mask_index == 4) {
-                        framing_state = DATA_FRAMING_EXTENSION_DATA;
-                        mask_offset = 0;
-                        mask_index = 0;
-                        if ((mask[0] == 0) && (mask[1] == 0) &&
-                            (mask[2] == 0) && (mask[3] == 0)) {
-                            masking = 0;
-                        }
-                    }
-                    else {
-                        break;
-                    }
-                    /* Fall through */
-                case DATA_FRAMING_EXTENSION_DATA:
-                    /* Deal with extension data when we support them -- FIXME */
-                    if (extension_bytes_remaining == 0) {
-                        if (payload_length > 0) {
-                            frame->application_data = (unsigned char *)
-                                realloc(frame->application_data,
-                                        frame->application_data_offset +
-                                        payload_length);
-                            if (frame->application_data == NULL) {
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = (state->protocol_version >= 13) ?
-                                               STATUS_CODE_INTERNAL_ERROR :
-                                               STATUS_CODE_GOING_AWAY;
-                                break;
-                            }
-                        }
-                        framing_state = DATA_FRAMING_APPLICATION_DATA;
-                    }
-                    /* Fall through */
-                case DATA_FRAMING_APPLICATION_DATA:
-                    {
-                        apr_int64_t block_data_length;
-                        apr_int64_t block_length = 0;
-                        apr_uint64_t application_data_offset =
-                            frame->application_data_offset;
-                        unsigned char *application_data =
-                            frame->application_data;
-
-                        block_length = block_size - block_offset;
-                        block_data_length =
-                            (payload_length >
-                             block_length) ? block_length : payload_length;
-
-                        if (masking) {
-                            apr_int64_t i;
-
-                            if (opcode == OPCODE_TEXT) {
-                                unsigned int utf8_state = frame->utf8_state;
-                                unsigned char c;
-
-                                for (i = 0; i < block_data_length; i++) {
-                                    c = block[block_offset++] ^
-                                        mask[mask_offset++ & 3];
-                                    utf8_state =
-                                        validate_utf8[utf8_state + c];
-                                    if (utf8_state == UTF8_INVALID) {
-                                        payload_length = block_data_length;
-                                        break;
-                                    }
-                                    application_data
-                                        [application_data_offset++] = c;
-                                }
-                                frame->utf8_state = utf8_state;
-                            }
-                            else {
-                                /* Need to optimize the unmasking -- FIXME */
-                                for (i = 0; i < block_data_length; i++) {
-                                    application_data
-                                        [application_data_offset++] =
-                                        block[block_offset++] ^
-                                        mask[mask_offset++ & 3];
-                                }
-                            }
-                        }
-                        else if (block_data_length > 0) {
-                            memcpy(&application_data[application_data_offset],
-                                   &block[block_offset], block_data_length);
-                            if (opcode == OPCODE_TEXT) {
-                                apr_int64_t i, application_data_end =
-                                    application_data_offset +
-                                    block_data_length;
-                                unsigned int utf8_state = frame->utf8_state;
-
-                                for (i = application_data_offset;
-                                     i < application_data_end; i++) {
-                                    utf8_state =
-                                        validate_utf8[utf8_state +
-                                                      application_data[i]];
-                                    if (utf8_state == UTF8_INVALID) {
-                                        payload_length = block_data_length;
-                                        break;
-                                    }
-                                }
-                                frame->utf8_state = utf8_state;
-                            }
-                            application_data_offset += block_data_length;
-                            block_offset += block_data_length;
-                        }
-                        payload_length -= block_data_length;
-
-                        if (payload_length == 0) {
-                            int message_type = MESSAGE_TYPE_INVALID;
-
-                            switch (opcode) {
-                            case OPCODE_TEXT:
-                                if ((fin &&
-                                    (frame->utf8_state != UTF8_VALID)) ||
-                                    (frame->utf8_state == UTF8_INVALID)) {
-                                    framing_state = DATA_FRAMING_CLOSE;
-                                    status_code = STATUS_CODE_INVALID_UTF8;
-                                }
-                                else {
-                                    message_type = MESSAGE_TYPE_TEXT;
-                                }
-                                break;
-                            case OPCODE_BINARY:
-                                message_type = MESSAGE_TYPE_BINARY;
-                                break;
-                            case OPCODE_CLOSE:
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_OK;
-                                break;
-                            case OPCODE_PING:
-                                apr_thread_mutex_lock(state->mutex);
-                                mod_websocket_send_internal(state,
-                                                            MESSAGE_TYPE_PONG,
-                                                            application_data,
-                                                            application_data_offset);
-                                apr_thread_mutex_unlock(state->mutex);
-                                break;
-                            case OPCODE_PONG:
-                                break;
-                            default:
-                                framing_state = DATA_FRAMING_CLOSE;
-                                status_code = STATUS_CODE_PROTOCOL_ERROR;
-                                break;
-                            }
-                            if (fin && (message_type != MESSAGE_TYPE_INVALID)) {
-                                conf->plugin->on_message(plugin_private,
-                                                         server, message_type,
-                                                         application_data,
-                                                         application_data_offset);
-                            }
-                            if (framing_state != DATA_FRAMING_CLOSE) {
-                                framing_state = DATA_FRAMING_START;
-
-                                if (fin) {
-                                    if (frame->application_data != NULL) {
-                                        free(frame->application_data);
-                                        frame->application_data = NULL;
-                                    }
-                                    application_data_offset = 0;
-                                }
-                            }
-                        }
-                        frame->application_data_offset =
-                            application_data_offset;
-                    }
-                    break;
-                case DATA_FRAMING_CLOSE:
-                    block_offset = block_size;
-                    break;
-                default:
-                    framing_state = DATA_FRAMING_CLOSE;
-                    status_code = STATUS_CODE_PROTOCOL_ERROR;
-                    break;
-                }
-            }
+            mod_websocket_handle_incoming(server, block, block_size,
+                                          &read_state, conf, plugin_private);
         }
-        if (message_frame.application_data != NULL) {
-            free(message_frame.application_data);
+        if (read_state.message_frame.application_data != NULL) {
+            free(read_state.message_frame.application_data);
         }
-        if (control_frame.application_data != NULL) {
-            free(control_frame.application_data);
+        if (read_state.control_frame.application_data != NULL) {
+            free(read_state.control_frame.application_data);
         }
 
         /* Send server-side closing handshake */
-        status_code_buffer[0] = (status_code >> 8) & 0xFF;
-        status_code_buffer[1] = status_code & 0xFF;
+        status_code_buffer[0] = (read_state.status_code >> 8) & 0xFF;
+        status_code_buffer[1] = read_state.status_code & 0xFF;
 
         apr_thread_mutex_lock(state->mutex);
         mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
-- 
2.1.1


[PATCH 2/5] Separate plugin_send from the send implementation

Posted by Jacob Champion <ja...@ni.com>.
The public-facing API will eventually communicate cross-thread instead
of writing directly to the brigade. In preparation, pull the send logic
into an internal function, and call that from the read loop instead of
mod_websocket_plugin_send.
---
 mod_websocket.c | 161 ++++++++++++++++++++++++++++++++------------------------
 1 file changed, 91 insertions(+), 70 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 5a04936..62be765 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -290,85 +290,103 @@ static void CALLBACK mod_websocket_protocol_set(const WebSocketServer *server,
     }
 }
 
+/*
+ * Sends data to the WebSocket connection using the given server state. The
+ * server state must be locked upon entering this function. buffer_size is
+ * assumed to be within the limits defined by the WebSocket protocol (i.e. fits
+ * in 63 bits).
+ */
+static size_t mod_websocket_send_internal(WebSocketState *state,
+                                          const int type,
+                                          const unsigned char *buffer,
+                                          const size_t buffer_size)
+{
+    apr_uint64_t payload_length =
+        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
+    size_t written = 0;
+
+    if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
+        unsigned char header[32];
+        ap_filter_t *of = state->r->connection->output_filters;
+        apr_size_t pos = 0;
+        unsigned char opcode;
+
+        switch (type) {
+        case MESSAGE_TYPE_TEXT:
+            opcode = OPCODE_TEXT;
+            break;
+        case MESSAGE_TYPE_BINARY:
+            opcode = OPCODE_BINARY;
+            break;
+        case MESSAGE_TYPE_PING:
+            opcode = OPCODE_PING;
+            break;
+        case MESSAGE_TYPE_PONG:
+            opcode = OPCODE_PONG;
+            break;
+        case MESSAGE_TYPE_CLOSE:
+        default:
+            state->closing = 1;
+            opcode = OPCODE_CLOSE;
+            break;
+        }
+        header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
+        if (payload_length < 126) {
+            header[pos++] =
+                FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
+        }
+        else {
+            if (payload_length < 65536) {
+                header[pos++] = FRAME_SET_MASK(0) | 126;
+            }
+            else {
+                header[pos++] = FRAME_SET_MASK(0) | 127;
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
+            }
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
+        }
+        ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
+        if (payload_length > 0) {
+            if (ap_fwrite(of, state->obb,
+                          (const char *)buffer,
+                          buffer_size) == APR_SUCCESS) { /* Payload Data */
+                written = buffer_size;
+            }
+        }
+        if (ap_fflush(of, state->obb) != APR_SUCCESS) {
+            written = 0;
+        }
+    }
+
+    return written;
+}
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
                                                  const size_t buffer_size)
 {
-    apr_uint64_t payload_length =
-        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
-
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-
-        if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
-            unsigned char header[32];
-            ap_filter_t *of = state->r->connection->output_filters;
-            apr_size_t pos = 0;
-            unsigned char opcode;
-
-            switch (type) {
-            case MESSAGE_TYPE_TEXT:
-                opcode = OPCODE_TEXT;
-                break;
-            case MESSAGE_TYPE_BINARY:
-                opcode = OPCODE_BINARY;
-                break;
-            case MESSAGE_TYPE_PING:
-                opcode = OPCODE_PING;
-                break;
-            case MESSAGE_TYPE_PONG:
-                opcode = OPCODE_PONG;
-                break;
-            case MESSAGE_TYPE_CLOSE:
-            default:
-                state->closing = 1;
-                opcode = OPCODE_CLOSE;
-                break;
-            }
-            header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
-            if (payload_length < 126) {
-                header[pos++] =
-                    FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
-            }
-            else {
-                if (payload_length < 65536) {
-                    header[pos++] = FRAME_SET_MASK(0) | 126;
-                }
-                else {
-                    header[pos++] = FRAME_SET_MASK(0) | 127;
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
-                }
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
-            }
-            ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
-            if (payload_length > 0) {
-                if (ap_fwrite(of, state->obb,
-                              (const char *)buffer,
-                              buffer_size) == APR_SUCCESS) { /* Payload Data */
-                    written = buffer_size;
-                }
-            }
-            if (ap_fflush(of, state->obb) != APR_SUCCESS) {
-                written = 0;
-            }
-        }
+        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
         apr_thread_mutex_unlock(state->mutex);
     }
+
     return written;
 }
 
+
 static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
                                                 server)
 {
@@ -775,10 +793,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                                 status_code = STATUS_CODE_OK;
                                 break;
                             case OPCODE_PING:
-                                mod_websocket_plugin_send(server,
-                                                          MESSAGE_TYPE_PONG,
-                                                          application_data,
-                                                          application_data_offset);
+                                apr_thread_mutex_lock(state->mutex);
+                                mod_websocket_send_internal(state,
+                                                            MESSAGE_TYPE_PONG,
+                                                            application_data,
+                                                            application_data_offset);
+                                apr_thread_mutex_unlock(state->mutex);
                                 break;
                             case OPCODE_PONG:
                                 break;
@@ -829,12 +849,13 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* Send server-side closing handshake */
         status_code_buffer[0] = (status_code >> 8) & 0xFF;
         status_code_buffer[1] = status_code & 0xFF;
-        mod_websocket_plugin_send(server, MESSAGE_TYPE_CLOSE,
-                                  status_code_buffer,
-                                  sizeof(status_code_buffer));
+
+        apr_thread_mutex_lock(state->mutex);
+        mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
+                                    status_code_buffer,
+                                    sizeof(status_code_buffer));
 
         /* We are done with the bucket brigade */
-        apr_thread_mutex_lock(state->mutex);
         state->obb = NULL;
         apr_brigade_destroy(obb);
     }
-- 
2.1.1


[PATCH 5/5] Remove extra pool and clean up brigades

Posted by Jacob Champion <ja...@ni.com>.
Now that the input and output brigades are only operated on from a
single thread, the extra pool and bucket allocator added as part of
commit cfaef071 can be removed. Also, allocate a single long-lived input
brigade instead of allocating new ones for every socket read.
---
 mod_websocket.c | 49 ++++++++++++++++++-------------------------------
 1 file changed, 18 insertions(+), 31 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 8e3bf26..9c62ca5 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -465,21 +465,19 @@ static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
 /*
  * Read a buffer of data from the input stream.
  */
-static apr_status_t mod_websocket_read_nonblock(request_rec *r, char *buffer,
+static apr_status_t mod_websocket_read_nonblock(request_rec *r,
+                                                apr_bucket_brigade *bb,
+                                                char *buffer,
                                                 apr_size_t *bufsiz)
 {
-    apr_status_t rv = APR_ENOMEM;
-    apr_bucket_brigade *bb;
+    apr_status_t rv;
 
-    bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
-    if (bb != NULL) {
-        if ((rv =
-             ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
-                            APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
-            rv = apr_brigade_flatten(bb, buffer, bufsiz);
-        }
-        apr_brigade_destroy(bb);
+    if ((rv = ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
+                             APR_NONBLOCK_READ, *bufsiz)) == APR_SUCCESS) {
+        rv = apr_brigade_flatten(bb, buffer, bufsiz);
+        apr_brigade_cleanup(bb);
     }
+
     return rv;
 }
 
@@ -893,29 +891,17 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 {
     WebSocketState *state = server->state;
     request_rec *r = state->r;
-    apr_pool_t *pool = NULL;
-    apr_bucket_alloc_t *bucket_alloc;
-    apr_bucket_brigade *obb;
+    apr_bucket_brigade *ibb, *obb;
     apr_pollset_t *pollset;
     apr_pollfd_t pollfd = { 0 };
     const apr_pollfd_t *signalled;
     apr_int32_t pollcnt;
     apr_queue_t * queue;
 
-    /* We cannot use the same bucket allocator for the ouput bucket brigade
-     * obb as the one associated with the connection (r->connection->bucket_alloc)
-     * because the same bucket allocator cannot be used in two different
-     * threads, and we use the connection bucket allocator in this
-     * thread - see docs on apr_bucket_alloc_create(). This results in
-     * occasional core dumps. So create our own bucket allocator and pool
-     * for output thread bucket brigade. (Thanks to Alex Bligh -- abligh)
-     */
-
-    if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
-        ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
-        ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
-        (apr_pollset_create(&pollset, 1, pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
-        (apr_queue_create(&queue, QUEUE_CAPACITY, pool) == APR_SUCCESS)) {
+    if (((ibb = apr_brigade_create(r->pool, r->connection->bucket_alloc)) != NULL) &&
+        ((obb = apr_brigade_create(r->pool, r->connection->bucket_alloc)) != NULL) &&
+        (apr_pollset_create(&pollset, 1, r->pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
+        (apr_queue_create(&queue, QUEUE_CAPACITY, r->pool) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         unsigned char status_code_buffer[2];
@@ -935,7 +921,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         state->queue = queue;
 
         /* Initialize the pollset */
-        pollfd.p = pool;
+        pollfd.p = r->pool;
         pollfd.desc_type = APR_POLL_SOCKET;
         pollfd.reqevents = APR_POLLIN;
         pollfd.desc.s = ap_get_conn_socket(state->r->connection);
@@ -960,7 +946,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
 
             /* Check to see if there is any data to read. */
             block_size = sizeof(block);
-            rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+            rv = mod_websocket_read_nonblock(r, ibb, (char *)block, &block_size);
 
             if (rv == APR_SUCCESS) {
                 mod_websocket_handle_incoming(server, block, block_size,
@@ -1026,8 +1012,9 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                                     status_code_buffer,
                                     sizeof(status_code_buffer));
 
-        /* We are done with the bucket brigade */
+        /* We are done with the bucket brigades */
         state->obb = NULL;
+        apr_brigade_destroy(ibb);
         apr_brigade_destroy(obb);
 
         state->pollset = NULL;
-- 
2.1.1


[PATCH 4/5] Read and write to the brigade from only one thread

Posted by Jacob Champion <ja...@ni.com>.
The prior implementation allowed clients to write to a parallel bucket
brigade from separate threads, while the main thread read incoming
messages from the original brigade. This appears to cause thread safety
issues, especially when combined with TLS (OpenSSL has the ability to
read from the socket during a write, and vice-versa).

This patch, inspired by both mod_spdy and mod_proxy_wstunnel, causes
cross-thread invocations of mod_websocket_plugin_send() to place their
messages on a queue and signal the main thread that a write is pending.
The event loop is based on apr_pollset.

See http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54EE23EB.5040705@ni.com%3E
for history. Thanks to Alex Bligh and Eric Covener for their
suggestions.
---
 mod_websocket.c | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 164 insertions(+), 11 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 1e88ef6..8e3bf26 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -25,8 +25,10 @@
  */
 
 #include "apr_base64.h"
+#include "apr_queue.h"
 #include "apr_sha1.h"
 #include "apr_strings.h"
+#include "apr_thread_cond.h"
 
 #include "httpd.h"
 #include "http_config.h"
@@ -58,6 +60,8 @@ typedef struct
 
 #define BLOCK_DATA_SIZE              4096
 
+#define QUEUE_CAPACITY                 16
+
 #define DATA_FRAMING_MASK               0
 #define DATA_FRAMING_START              1
 #define DATA_FRAMING_PAYLOAD_LENGTH     2
@@ -214,10 +218,14 @@ typedef struct _WebSocketState
 {
     request_rec *r;
     apr_bucket_brigade *obb;
+    apr_os_thread_t main_thread;
     apr_thread_mutex_t *mutex;
+    apr_thread_cond_t *cond;
     apr_array_header_t *protocols;
     int closing;
     apr_int64_t protocol_version;
+    apr_pollset_t *pollset;
+    apr_queue_t *queue;
 } WebSocketState;
 
 static request_rec *CALLBACK mod_websocket_request(const WebSocketServer *server)
@@ -367,6 +375,15 @@ static size_t mod_websocket_send_internal(WebSocketState *state,
     return written;
 }
 
+typedef struct
+{
+    int type;
+    const unsigned char * buffer;
+    size_t buffer_size;
+    int done;
+    size_t written;
+} WebSocketMessageData;
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
@@ -375,11 +392,60 @@ static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
+    /* FIXME - if sending a zero-length message, the API cannot distinguish
+     * between success and failure */
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+
+        if (apr_os_thread_equal(apr_os_thread_current(), state->main_thread)) {
+            /* This is the main thread. It's safe to write messages directly. */
+            written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+        }
+        else if ((state->pollset != NULL) && (state->queue != NULL) &&
+                 !state->closing) {
+            /* Dispatch this message to the main thread. */
+            apr_status_t rv;
+            WebSocketMessageData msg = { 0 };
+
+            /* Populate the message data. */
+            msg.type = type;
+            msg.buffer = buffer;
+            msg.buffer_size = buffer_size;
+
+            /* Queue the message. */
+            do {
+                rv = apr_queue_push(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
+
+            if (rv != APR_SUCCESS) {
+                /* Couldn't push the message onto the queue. */
+                goto send_unlock;
+            }
+
+            /* Interrupt the pollset. */
+            rv = apr_pollset_wakeup(state->pollset);
+
+            if (rv != APR_SUCCESS) {
+                /*
+                 * Couldn't wake up poll...? We can't return zero since we've
+                 * already pushed the message, and it might actually be sent...
+                 */
+                /* TODO: log. */
+            }
+
+            /* Wait for the message to be written. */
+            while (!msg.done && !state->closing) {
+                apr_thread_cond_wait(state->cond, state->mutex);
+            }
+
+            if (msg.done) {
+                written = msg.written;
+            }
+        }
+
+send_unlock:
         apr_thread_mutex_unlock(state->mutex);
     }
 
@@ -797,6 +863,26 @@ static void mod_websocket_handle_incoming(const WebSocketServer *server,
     }
 }
 
+static void mod_websocket_handle_outgoing(const WebSocketServer *server,
+                                          WebSocketMessageData *msg)
+{
+    apr_thread_mutex_lock(server->state->mutex);
+    msg->written = mod_websocket_send_internal(server->state, msg->type,
+                                               msg->buffer, msg->buffer_size);
+
+    /*
+     * Notify plugin_send() that the message has been sent.
+     *
+     * XXX Wake up _all_ the waiting threads, since we don't know which one owns
+     * this message. This is contentious if there are a lot of threads writing
+     * in parallel.
+     */
+    msg->done = 1;
+    apr_thread_cond_broadcast(server->state->cond);
+
+    apr_thread_mutex_unlock(server->state->mutex);
+}
+
 /*
  * The data framing handler requires that the server state mutex is locked by
  * the caller upon entering this function. It will be locked when leaving too.
@@ -814,6 +900,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     apr_pollfd_t pollfd = { 0 };
     const apr_pollfd_t *signalled;
     apr_int32_t pollcnt;
+    apr_queue_t * queue;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection (r->connection->bucket_alloc)
@@ -827,7 +914,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
         ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
-        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
+        (apr_pollset_create(&pollset, 1, pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS) &&
+        (apr_queue_create(&queue, QUEUE_CAPACITY, pool) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         unsigned char status_code_buffer[2];
@@ -844,6 +932,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         read_state.frame = &read_state.control_frame;
         read_state.opcode = 0xFF;
 
+        state->queue = queue;
+
         /* Initialize the pollset */
         pollfd.p = pool;
         pollfd.desc_type = APR_POLL_SOCKET;
@@ -851,25 +941,74 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         pollfd.desc.s = ap_get_conn_socket(state->r->connection);
         apr_pollset_add(pollset, &pollfd);
 
+        state->pollset = pollset;
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
+        /*
+         * Main loop, inspired by mod_spdy. Alternate between data coming from
+         * the client and data coming from the server. Only block in poll() if
+         * there is no work to be done for either side.
+         */
         while ((read_state.framing_state != DATA_FRAMING_CLOSE)) {
             apr_status_t rv;
+            apr_interval_time_t timeout;
+            WebSocketMessageData *msg;
+            int work_done = 0;
 
+            /* Check to see if there is any data to read. */
+            block_size = sizeof(block);
+            rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_incoming(server, block, block_size,
+                                              &read_state, conf, plugin_private);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
+
+            /* Check to see if there is any data to write. */
             do {
-                block_size = sizeof(block);
-                rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
-            } while (APR_STATUS_IS_EAGAIN(rv) &&
-                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == APR_SUCCESS);
+                rv = apr_queue_trypop(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
 
-            if (rv != APR_SUCCESS) {
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_outgoing(server, msg);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
                 break;
             }
 
-            mod_websocket_handle_incoming(server, block, block_size,
-                                          &read_state, conf, plugin_private);
+            /*
+             * If there's nothing to do, wait for new work to come in.
+             *
+             * Because Windows cannot poll on both a file pipe and a socket,
+             * plugin_send() uses apr_pollset_wakeup() to signal that new data
+             * is available to write. This is lossy (multiple threads calling
+             * wakeup() will result in only one wakeup here) so it's important
+             * that we do not block until state->queue has emptied. Otherwise
+             * it's possible to lose messages in the queue.
+             *
+             * NOTE: The wakeup pipe is drained only during apr_pollset_poll(),
+             * so we call it each iteration to avoid filling it up. We only
+             * block in poll() (negative timeout) if there was no work done
+             * during the current iteration.
+             */
+            timeout = work_done ? 0 : -1;
+            rv = apr_pollset_poll(state->pollset, timeout, &pollcnt, &signalled);
+
+            if ((rv != APR_SUCCESS) && !APR_STATUS_IS_EINTR(rv) &&
+                    !APR_STATUS_IS_TIMEUP(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
         }
         if (read_state.message_frame.application_data != NULL) {
             free(read_state.message_frame.application_data);
@@ -890,6 +1029,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* We are done with the bucket brigade */
         state->obb = NULL;
         apr_brigade_destroy(obb);
+
+        state->pollset = NULL;
+        apr_pollset_destroy(pollset);
+
+        state->queue = NULL;
+        apr_queue_term(queue);
     }
 }
 
@@ -957,8 +1102,10 @@ static int mod_websocket_method_handler(request_rec *r)
                                          &websocket_module);
 
                 if ((conf != NULL) && (conf->plugin != NULL)) {
-                    WebSocketState state =
-                        { r, NULL, NULL, NULL, 0, protocol_version };
+                    WebSocketState state = {
+                        r, NULL, apr_os_thread_current(), NULL, NULL, NULL, 0,
+                        protocol_version, NULL, NULL
+                    };
                     WebSocketServer server = {
                         sizeof(WebSocketServer), 1, &state,
                         mod_websocket_request, mod_websocket_header_get,
@@ -1015,6 +1162,8 @@ static int mod_websocket_method_handler(request_rec *r)
                     apr_thread_mutex_create(&state.mutex,
                                             APR_THREAD_MUTEX_DEFAULT,
                                             r->pool);
+                    apr_thread_cond_create(&state.cond, r->pool);
+
                     apr_thread_mutex_lock(state.mutex);
 
                     /*
@@ -1043,6 +1192,9 @@ static int mod_websocket_method_handler(request_rec *r)
                         mod_websocket_data_framing(&server, conf,
                                                    plugin_private);
 
+                        /* Wake up any waiting plugin_sends before closing */
+                        apr_thread_cond_broadcast(state.cond);
+
                         apr_thread_mutex_unlock(state.mutex);
 
                         /* Tell the plugin that we are disconnecting */
@@ -1068,6 +1220,7 @@ static int mod_websocket_method_handler(request_rec *r)
                     /* Close the connection */
                     ap_lingering_close(r->connection);
 
+                    apr_thread_cond_destroy(state.cond);
                     apr_thread_mutex_destroy(state.mutex);
 
                     return OK;
-- 
2.1.1