You are viewing a plain text version of this content. The canonical link for it is here.
Posted to modules-dev@httpd.apache.org by Jacob Champion <ja...@ni.com> on 2015/03/09 19:43:53 UTC
[PATCH 2/5] Separate plugin_send from the send implementation
The public-facing API will eventually communicate cross-thread instead
of writing directly to the brigade. In preparation, pull the send logic
into an internal function, and call that from the read loop instead of
mod_websocket_plugin_send.
---
mod_websocket.c | 161 ++++++++++++++++++++++++++++++++------------------------
1 file changed, 91 insertions(+), 70 deletions(-)
diff --git a/mod_websocket.c b/mod_websocket.c
index 5a04936..62be765 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -290,85 +290,103 @@ static void CALLBACK mod_websocket_protocol_set(const WebSocketServer *server,
}
}
+/*
+ * Sends data to the WebSocket connection using the given server state. The
+ * server state must be locked upon entering this function. buffer_size is
+ * assumed to be within the limits defined by the WebSocket protocol (i.e. fits
+ * in 63 bits).
+ */
+static size_t mod_websocket_send_internal(WebSocketState *state,
+ const int type,
+ const unsigned char *buffer,
+ const size_t buffer_size)
+{
+ apr_uint64_t payload_length =
+ (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
+ size_t written = 0;
+
+ if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
+ unsigned char header[32];
+ ap_filter_t *of = state->r->connection->output_filters;
+ apr_size_t pos = 0;
+ unsigned char opcode;
+
+ switch (type) {
+ case MESSAGE_TYPE_TEXT:
+ opcode = OPCODE_TEXT;
+ break;
+ case MESSAGE_TYPE_BINARY:
+ opcode = OPCODE_BINARY;
+ break;
+ case MESSAGE_TYPE_PING:
+ opcode = OPCODE_PING;
+ break;
+ case MESSAGE_TYPE_PONG:
+ opcode = OPCODE_PONG;
+ break;
+ case MESSAGE_TYPE_CLOSE:
+ default:
+ state->closing = 1;
+ opcode = OPCODE_CLOSE;
+ break;
+ }
+ header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
+ if (payload_length < 126) {
+ header[pos++] =
+ FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
+ }
+ else {
+ if (payload_length < 65536) {
+ header[pos++] = FRAME_SET_MASK(0) | 126;
+ }
+ else {
+ header[pos++] = FRAME_SET_MASK(0) | 127;
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
+ }
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
+ header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
+ }
+ ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
+ if (payload_length > 0) {
+ if (ap_fwrite(of, state->obb,
+ (const char *)buffer,
+ buffer_size) == APR_SUCCESS) { /* Payload Data */
+ written = buffer_size;
+ }
+ }
+ if (ap_fflush(of, state->obb) != APR_SUCCESS) {
+ written = 0;
+ }
+ }
+
+ return written;
+}
+
static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
const int type,
const unsigned char *buffer,
const size_t buffer_size)
{
- apr_uint64_t payload_length =
- (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0);
size_t written = 0;
/* Deal with size more that 63 bits - FIXME */
-
if ((server != NULL) && (server->state != NULL)) {
WebSocketState *state = server->state;
apr_thread_mutex_lock(state->mutex);
-
- if ((state->r != NULL) && (state->obb != NULL) && !state->closing) {
- unsigned char header[32];
- ap_filter_t *of = state->r->connection->output_filters;
- apr_size_t pos = 0;
- unsigned char opcode;
-
- switch (type) {
- case MESSAGE_TYPE_TEXT:
- opcode = OPCODE_TEXT;
- break;
- case MESSAGE_TYPE_BINARY:
- opcode = OPCODE_BINARY;
- break;
- case MESSAGE_TYPE_PING:
- opcode = OPCODE_PING;
- break;
- case MESSAGE_TYPE_PONG:
- opcode = OPCODE_PONG;
- break;
- case MESSAGE_TYPE_CLOSE:
- default:
- state->closing = 1;
- opcode = OPCODE_CLOSE;
- break;
- }
- header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode);
- if (payload_length < 126) {
- header[pos++] =
- FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0);
- }
- else {
- if (payload_length < 65536) {
- header[pos++] = FRAME_SET_MASK(0) | 126;
- }
- else {
- header[pos++] = FRAME_SET_MASK(0) | 127;
- header[pos++] = FRAME_SET_LENGTH(payload_length, 7);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 6);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 5);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 4);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 3);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 2);
- }
- header[pos++] = FRAME_SET_LENGTH(payload_length, 1);
- header[pos++] = FRAME_SET_LENGTH(payload_length, 0);
- }
- ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */
- if (payload_length > 0) {
- if (ap_fwrite(of, state->obb,
- (const char *)buffer,
- buffer_size) == APR_SUCCESS) { /* Payload Data */
- written = buffer_size;
- }
- }
- if (ap_fflush(of, state->obb) != APR_SUCCESS) {
- written = 0;
- }
- }
+ written = mod_websocket_send_internal(state, type, buffer, buffer_size);
apr_thread_mutex_unlock(state->mutex);
}
+
return written;
}
+
static void CALLBACK mod_websocket_plugin_close(const WebSocketServer *
server)
{
@@ -775,10 +793,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
status_code = STATUS_CODE_OK;
break;
case OPCODE_PING:
- mod_websocket_plugin_send(server,
- MESSAGE_TYPE_PONG,
- application_data,
- application_data_offset);
+ apr_thread_mutex_lock(state->mutex);
+ mod_websocket_send_internal(state,
+ MESSAGE_TYPE_PONG,
+ application_data,
+ application_data_offset);
+ apr_thread_mutex_unlock(state->mutex);
break;
case OPCODE_PONG:
break;
@@ -829,12 +849,13 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
/* Send server-side closing handshake */
status_code_buffer[0] = (status_code >> 8) & 0xFF;
status_code_buffer[1] = status_code & 0xFF;
- mod_websocket_plugin_send(server, MESSAGE_TYPE_CLOSE,
- status_code_buffer,
- sizeof(status_code_buffer));
+
+ apr_thread_mutex_lock(state->mutex);
+ mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE,
+ status_code_buffer,
+ sizeof(status_code_buffer));
/* We are done with the bucket brigade */
- apr_thread_mutex_lock(state->mutex);
state->obb = NULL;
apr_brigade_destroy(obb);
}
--
2.1.1