You are viewing a plain text version of this content. The canonical link for it is here.
Posted to modules-dev@httpd.apache.org by Jacob Champion <ja...@ni.com> on 2015/03/09 19:43:53 UTC

[PATCH 2/5] Separate plugin_send from the send implementation

The public-facing API will eventually communicate cross-thread instead
of writing directly to the brigade. In preparation, pull the send logic
into an internal function, and call that from the read loop instead of
mod_websocket_plugin_send.
---
 mod_websocket.c | 161 ++++++++++++++++++++++++++++++++------------------------
 1 file changed, 91 insertions(+), 70 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 5a04936..62be765 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -290,85 +290,103 @@ static void CALLBACK mod_websocket_protocol_set(const WebSocketServer *server,
     }
 }
 
+/*
+ * Sends data to the WebSocket connection using the given server state. The
+ * server state must be locked upon entering this function. buffer_size is
+ * assumed to be within the limits defined by the WebSocket protocol (i.e. fits
+ * in 63 bits).
+ */
+static size_t mod_websocket_send_internal(WebSocketState *state,
+                                          const int type,
+                                          const unsigned char *buffer,
+                                          const size_t buffer_size)
+{
+    apr_uint64_t payload_length =
+        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
+    size_t written = 0;
+
+    if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
+        unsigned char header[32];
+        ap_filter_t *of = state->r->connection->output_filters;
+        apr_size_t pos = 0;
+        unsigned char opcode;
+
+        switch (type) {
+        case MESSAGE_TYPE_TEXT:
+            opcode = OPCODE_TEXT;
+            break;
+        case MESSAGE_TYPE_BINARY:
+            opcode = OPCODE_BINARY;
+            break;
+        case MESSAGE_TYPE_PING:
+            opcode = OPCODE_PING;
+            break;
+        case MESSAGE_TYPE_PONG:
+            opcode = OPCODE_PONG;
+            break;
+        case MESSAGE_TYPE_CLOSE:
+        default:
+            state->closing = 1;
+            opcode = OPCODE_CLOSE;
+            break;
+        }
+        header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
+        if (payload_length < 126) {
+            header[pos++] =
+                FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
+        }
+        else {
+            if (payload_length < 65536) {
+                header[pos++] = FRAME_SET_MASK(0) | 126;
+            }
+            else {
+                header[pos++] = FRAME_SET_MASK(0) | 127;
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
+                header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
+            }
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
+            header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
+        }
+        ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
+        if (payload_length > 0) {
+            if (ap_fwrite(of, state->obb,
+                          (const char *)buffer,
+                          buffer_size) == APR_SUCCESS) { /* Payload Data */
+                written = buffer_size;
+            }
+        }
+        if (ap_fflush(of, state->obb) != APR_SUCCESS) {
+            written = 0;
+        }
+    }
+
+    return written;
+}
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
                                                  const size_t buffer_size)
 {
-    apr_uint64_t payload_length =
-        (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
-
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-
-        if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
-            unsigned char header[32];
-            ap_filter_t *of = state->r->connection->output_filters;
-            apr_size_t pos = 0;
-            unsigned char opcode;
-
-            switch (type) {
-            case MESSAGE_TYPE_TEXT:
-                opcode = OPCODE_TEXT;
-                break;
-            case MESSAGE_TYPE_BINARY:
-                opcode = OPCODE_BINARY;
-                break;
-            case MESSAGE_TYPE_PING:
-                opcode = OPCODE_PING;
-                break;
-            case MESSAGE_TYPE_PONG:
-                opcode = OPCODE_PONG;
-                break;
-            case MESSAGE_TYPE_CLOSE:
-            default:
-                state->closing = 1;
-                opcode = OPCODE_CLOSE;
-                break;
-            }
-            header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
-            if (payload_length < 126) {
-                header[pos++] =
-                    FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
-            }
-            else {
-                if (payload_length < 65536) {
-                    header[pos++] = FRAME_SET_MASK(0) | 126;
-                }
-                else {
-                    header[pos++] = FRAME_SET_MASK(0) | 127;
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
-                    header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
-                }
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
-                header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
-            }
-            ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
-            if (payload_length > 0) {
-                if (ap_fwrite(of, state->obb,
-                              (const char *)buffer,
-                              buffer_size) == APR_SUCCESS) { /* Payload Data */
-                    written = buffer_size;
-                }
-            }
-            if (ap_fflush(of, state->obb) != APR_SUCCESS) {
-                written = 0;
-            }
-        }
+        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
         apr_thread_mutex_unlock(state->mutex);
     }
+
     return written;
 }
 
+
 static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
                                                 server)
 {
@@ -775,10 +793,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
                                 status_code = STATUS_CODE_OK;
                                 break;
                             case OPCODE_PING:
-                                mod_websocket_plugin_send(server,
-                                                          MESSAGE_TYPE_PONG,
-                                                          application_data,
-                                                          application_data_offset);
+                                apr_thread_mutex_lock(state->mutex);
+                                mod_websocket_send_internal(state,
+                                                            MESSAGE_TYPE_PONG,
+                                                            application_data,
+                                                            application_data_offset);
+                                apr_thread_mutex_unlock(state->mutex);
                                 break;
                             case OPCODE_PONG:
                                 break;
@@ -829,12 +849,13 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* Send server-side closing handshake */
         status_code_buffer[0] = (status_code >> 8) & 0xFF;
         status_code_buffer[1] = status_code & 0xFF;
-        mod_websocket_plugin_send(server, MESSAGE_TYPE_CLOSE,
-                                  status_code_buffer,
-                                  sizeof(status_code_buffer));
+
+        apr_thread_mutex_lock(state->mutex);
+        mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
+                                    status_code_buffer,
+                                    sizeof(status_code_buffer));
 
         /* We are done with the bucket brigade */
-        apr_thread_mutex_lock(state->mutex);
         state->obb = NULL;
         apr_brigade_destroy(obb);
     }
-- 
2.1.1