You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by wi...@apache.org on 2020/01/13 19:31:33 UTC
[couchdb-mochiweb] 04/08: partial work on active websocket api
This is an automated email from the ASF dual-hosted git repository.
willholley pushed a commit to tag active-passive
in repository https://gitbox.apache.org/repos/asf/couchdb-mochiweb.git
commit 4146cdfb39318ee5b1e261a6c375fb87c7ab8c3e
Author: Richard Jones <rj...@metabrew.com>
AuthorDate: Mon Oct 11 18:05:03 2010 +0100
partial work on active websocket api
---
examples/websockets/websockets.erl | 24 +++++-
src/mochiweb_http.erl | 12 ++-
src/mochiweb_websocket_delegate.erl | 159 ++++++++++++++++++++++++++++++++++++
3 files changed, 191 insertions(+), 4 deletions(-)
diff --git a/examples/websockets/websockets.erl b/examples/websockets/websockets.erl
index f50c5b4..ebd8fc6 100644
--- a/examples/websockets/websockets.erl
+++ b/examples/websockets/websockets.erl
@@ -1,7 +1,7 @@
-module(websockets).
-author('author <rj...@metabrew.com>').
--export([start/0, start/1, stop/0, loop/2, wsloop/1]).
+-export([start/0, start/1, stop/0, loop/2, wsloop_active/1]).
start() -> start([{port, 8080}, {docroot, "."}]).
@@ -10,11 +10,31 @@ start(Options) ->
Loop = fun (Req) -> ?MODULE:loop(Req, DocRoot) end,
mochiweb_http:start([{name, ?MODULE},
{loop, Loop},
- {wsloop, {?MODULE, wsloop}} | Options1]).
+ {wsloop, {?MODULE, wsloop_active}} | Options1]).
stop() ->
mochiweb_http:stop(?MODULE).
+wsloop_active(Pid) ->
+ mochiweb_websocket_delegate:send(Pid, "WELCOME MSG!"),
+ wsloop_active0(Pid).
+
+wsloop_active0(Pid) ->
+ receive
+ closed ->
+ io:format("client api got closed~n",[]),
+ ok;
+ {error, _Reason} ->
+ ok;
+ % {legacy_frame, M} or {utf8_frame, M}
+ {_, X} ->
+ Msg = io_lib:format("SRVER_GOT: ~p", [X]),
+ mochiweb_websocket_delegate:send(Pid, Msg)
+ after 10000 ->
+ mochiweb_websocket_delegate:send(Pid, "IDLE!")
+ end,
+ wsloop_active0(Pid).
+
wsloop(Ws) ->
io:format("Websocket request, path: ~p~n", [Ws:get(path)]),
case Ws:get_data() of
diff --git a/src/mochiweb_http.erl b/src/mochiweb_http.erl
index d38c6d5..69255d4 100644
--- a/src/mochiweb_http.erl
+++ b/src/mochiweb_http.erl
@@ -136,8 +136,16 @@ headers(Socket, Request, Headers, {WwwLoop, WsLoop} = Body, HeaderCount) ->
io:format("notmal -> ws~n",[]),
{_, {abs_path,Path}, _} = Request,
ok = websocket_init(Socket, Path, H),
- WsReq = mochiweb_wsrequest:new(Socket, Path, H),
- call_body(WsLoop, WsReq);
+ Active = true,
+ case Active of
+ true ->
+ {ok, WSPid} = mochiweb_websocket_delegate:start_link(Path, H, self()),
+ mochiweb_websocket_delegate:go(WSPid, Socket),
+ call_body(WsLoop, WSPid);
+ false ->
+ WsReq = mochiweb_wsrequest:new(Socket, Path, H),
+ call_body(WsLoop, WsReq)
+ end;
X -> %% not websocket:
io:format("notmal~p~n",[X]),
Req = mochiweb:new_request({Socket, Request,
diff --git a/src/mochiweb_websocket_delegate.erl b/src/mochiweb_websocket_delegate.erl
new file mode 100644
index 0000000..6c7bd85
--- /dev/null
+++ b/src/mochiweb_websocket_delegate.erl
@@ -0,0 +1,159 @@
+%% @author Richard Jones <rj...@metabrew.com>
+%%
+%% Process for handling send/recv on an established websocket
+%% providing an 'active' api, ala gen_tcp in active mode.
+%%
+%% @see http://www.whatwg.org/specs/web-socket-protocol/
+%% As of August 2010
+%%
+%% However, at time of writing (Oct 8, 2010) Chrome 6 and Firefox 4 implement
+%% an older version of the websocket spec, where messages are framed 0x00...0xFF
+%% so the newer protocol with length headers has not been tested with a browser.
+
+-module(mochiweb_websocket_delegate).
+-behaviour(gen_server).
+
+-record(state, {path,
+ headers,
+ legacy,
+ socket,
+ dest,
+ buffer,
+ partial,
+ ft,
+ flen}).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([start_link/3, go/2, send/2, close/1, headers/1, path/1]).
+
+%%
+
+start_link(Path, Headers, Destination) ->
+ gen_server:start_link(?MODULE, [Path, Headers, Destination], []).
+
+go(Pid, Socket) ->
+ ok = gen_tcp:controlling_process(Socket, Pid),
+ gen_server:cast(Pid, {go, Socket}).
+
+send(Pid, Msg) ->
+ io:format("send:~s~n",[Msg]),
+ gen_server:call(Pid, {send, Msg}).
+
+close(Pid) ->
+ gen_server:call(Pid, close).
+
+headers(Pid) ->
+ gen_server:call(Pid, headers).
+
+path(Pid) ->
+ gen_server:call(Pid, path).
+
+%%
+
+init([Path, Headers, Dest]) ->
+ process_flag(trap_exit, true),
+ {ok, #state{path=Path,
+ legacy=true,
+ headers=Headers,
+ dest=Dest,
+ ft = undefined,
+ buffer = <<>>,
+ partial= <<>>
+ }}.
+
+handle_call(close, _From, State) ->
+ mochiweb_socket:close(State#state.socket),
+ {reply, ok, State};
+handle_call(headers, _From, State) ->
+ {reply, State#state.headers, State};
+handle_call(path, _From, State) ->
+ {reply, State#state.path, State};
+handle_call({send, Msg}, _From, State = #state{legacy=false, socket=Socket}) ->
+ % header is 0xFF then 64bit big-endian int of the msg length
+ Len = iolist_size(Msg),
+ R = mochiweb_socket:send(Socket, [255, <<Len:64/unsigned-integer>>, Msg]),
+ {reply, R, State};
+handle_call({send, Msg}, _From, State = #state{legacy=true, socket=Socket}) ->
+ % legacy spec, msgs are framed with 0x00..0xFF
+ R = mochiweb_socket:send(Socket, [0, Msg, 255]),
+ {reply, R, State}.
+
+handle_cast({go, Socket}, State) ->
+ mochiweb_socket:setopts(Socket, [{active, true}]),
+ {noreply, State#state{socket=Socket}}.
+
+handle_info({'EXIT', _, _}, State) ->
+ io:format("TRAP EXIT~n",[]),
+ State#state.dest ! closed,
+ {stop, normal, State};
+handle_info({tcp_closed, Sock}, State = #state{socket=Sock}) ->
+ State#state.dest ! closed,
+ {stop, normal, State};
+handle_info({tcp_error, Sock, Reason}, State = #state{socket=Sock}) ->
+ State#state.dest ! {error, Reason},
+ {stop, normal, State};
+handle_info({tcp, Sock, Data}, State = #state{socket=Sock, buffer=Buffer}) ->
+ %mochiweb_socket:setopts(Sock, [{active, once}]),
+ NewState = process_data(State#state{buffer= <<Buffer/binary,Data/binary>>}),
+ {noreply, NewState}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%% Internal functions
+
+process_data(State = #state{buffer= <<>>}) ->
+ %io:format("A 0~n", []),
+ State;
+
+process_data(State = #state{buffer= <<FrameType:8,Buffer/binary>>, ft=undefined}) ->
+ %io:format("A 1~n", []),
+ process_data(State#state{buffer=Buffer, ft=FrameType, partial= <<>>});
+
+% "Legacy" frames, 0x00...0xFF
+% or modern closing handshake 0x00{8}
+process_data(State = #state{buffer= <<0,0,0,0,0,0,0,0, Buffer/binary>>, ft=0}) ->
+ %io:format("A 2~n", []),
+ State#state.dest ! closing_handshake,
+ process_data(State#state{buffer=Buffer, ft=undefined});
+
+process_data(State = #state{buffer= <<255, Rest/binary>>, ft=0}) ->
+ %io:format("A 3~n", []),
+ State#state.dest ! {legacy_frame, State#state.partial},
+ process_data(State#state{partial= <<>>, ft=undefined, buffer=Rest});
+
+process_data(State = #state{buffer= <<Byte:8, Rest/binary>>, ft=0, partial=Partial}) ->
+ %io:format("A 4, byte=~p state:~p~n", [Byte,State]),
+ NewPartial = case Partial of <<>> -> <<Byte>> ; _ -> <<Partial/binary, <<Byte>>/binary>> end,
+ NewState = State#state{buffer=Rest, partial=NewPartial},
+ process_data(NewState);
+
+% "Modern" frames, starting with 0xFF, followed by 64 bit length
+process_data(State = #state{buffer= <<Len:64/unsigned-integer,Buffer/binary>>, ft=255, flen=undefined}) ->
+ %io:format("A 5~n", []),
+ BitsLen = Len*8,
+ case Buffer of
+ <<Frame:BitsLen/binary, Rest/binary>> ->
+ State#state.dest ! {utf8_frame, Frame},
+ process_data(State#state{ft=undefined, flen=undefined, buffer=Rest});
+
+ _ ->
+ State#state{flen=Len, buffer=Buffer}
+ end;
+
+process_data(State = #state{buffer=Buffer, ft=255, flen=Len}) when is_integer(Len) ->
+ %io:format("A 6~n", []),
+ BitsLen = Len*8,
+ case Buffer of
+ <<Frame:BitsLen/binary, Rest/binary>> ->
+ State#state.dest ! {utf8_frame, Frame},
+ process_data(State#state{ft=undefined, flen=undefined, buffer=Rest});
+
+ _ ->
+ State#state{flen=Len, buffer=Buffer}
+ end.