You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by wi...@apache.org on 2020/01/13 19:31:33 UTC

[couchdb-mochiweb] 04/08: partial work on active websocket api

This is an automated email from the ASF dual-hosted git repository.

willholley pushed a commit to tag active-passive
in repository https://gitbox.apache.org/repos/asf/couchdb-mochiweb.git

commit 4146cdfb39318ee5b1e261a6c375fb87c7ab8c3e
Author: Richard Jones <rj...@metabrew.com>
AuthorDate: Mon Oct 11 18:05:03 2010 +0100

    partial work on active websocket api
---
 examples/websockets/websockets.erl  |  24 +++++-
 src/mochiweb_http.erl               |  12 ++-
 src/mochiweb_websocket_delegate.erl | 159 ++++++++++++++++++++++++++++++++++++
 3 files changed, 191 insertions(+), 4 deletions(-)

diff --git a/examples/websockets/websockets.erl b/examples/websockets/websockets.erl
index f50c5b4..ebd8fc6 100644
--- a/examples/websockets/websockets.erl
+++ b/examples/websockets/websockets.erl
@@ -1,7 +1,7 @@
 -module(websockets).
 -author('author <rj...@metabrew.com>').
 
--export([start/0, start/1, stop/0, loop/2, wsloop/1]).
+-export([start/0, start/1, stop/0, loop/2, wsloop_active/1]).
 
 start() -> start([{port, 8080}, {docroot, "."}]).
 
@@ -10,11 +10,31 @@ start(Options) ->
     Loop = fun (Req) -> ?MODULE:loop(Req, DocRoot) end,
     mochiweb_http:start([{name, ?MODULE}, 
                          {loop, Loop}, 
-                         {wsloop, {?MODULE, wsloop}} | Options1]).
+                         {wsloop, {?MODULE, wsloop_active}} | Options1]).
 
 stop() ->
     mochiweb_http:stop(?MODULE).
 
+wsloop_active(Pid) ->
+    mochiweb_websocket_delegate:send(Pid, "WELCOME MSG!"),
+    wsloop_active0(Pid).
+
+wsloop_active0(Pid) ->
+    receive
+        closed ->
+            io:format("client api got closed~n",[]),
+            ok;
+        {error, _Reason} ->
+            ok;
+        % {legacy_frame, M} or {utf8_frame, M}
+        {_, X} ->
+            Msg = io_lib:format("SRVER_GOT: ~p", [X]),
+            mochiweb_websocket_delegate:send(Pid, Msg)
+    after 10000 ->
+            mochiweb_websocket_delegate:send(Pid, "IDLE!")
+    end,
+    wsloop_active0(Pid).
+
 wsloop(Ws) ->
     io:format("Websocket request, path: ~p~n", [Ws:get(path)]),
     case Ws:get_data() of
diff --git a/src/mochiweb_http.erl b/src/mochiweb_http.erl
index d38c6d5..69255d4 100644
--- a/src/mochiweb_http.erl
+++ b/src/mochiweb_http.erl
@@ -136,8 +136,16 @@ headers(Socket, Request, Headers, {WwwLoop, WsLoop} = Body, HeaderCount) ->
                     io:format("notmal -> ws~n",[]),
                     {_, {abs_path,Path}, _} = Request,
                     ok = websocket_init(Socket, Path, H),
-                    WsReq = mochiweb_wsrequest:new(Socket, Path, H),
-                    call_body(WsLoop, WsReq);
+                    Active = true,
+                    case Active of
+                        true ->
+                            {ok, WSPid} = mochiweb_websocket_delegate:start_link(Path, H, self()),
+                            mochiweb_websocket_delegate:go(WSPid, Socket),
+                            call_body(WsLoop, WSPid);
+                        false ->
+                            WsReq = mochiweb_wsrequest:new(Socket, Path, H),
+                            call_body(WsLoop, WsReq)
+                    end;
                 X -> %% not websocket:
                     io:format("notmal~p~n",[X]),
                     Req = mochiweb:new_request({Socket, Request,
diff --git a/src/mochiweb_websocket_delegate.erl b/src/mochiweb_websocket_delegate.erl
new file mode 100644
index 0000000..6c7bd85
--- /dev/null
+++ b/src/mochiweb_websocket_delegate.erl
@@ -0,0 +1,159 @@
+%% @author Richard Jones <rj...@metabrew.com>
+%%
+%% Process for handling send/recv on an established websocket
+%% providing an 'active' api, ala gen_tcp in active mode.
+%%
+%% @see http://www.whatwg.org/specs/web-socket-protocol/
+%% As of August 2010
+%%
+%% However, at time of writing (Oct 8, 2010) Chrome 6 and Firefox 4 implement
+%% an older version of the websocket spec, where messages are framed 0x00...0xFF
+%% so the newer protocol with length headers has not been tested with a browser.
+
+-module(mochiweb_websocket_delegate).
+-behaviour(gen_server).
+
+-record(state, {path, 
+                headers, 
+                legacy,
+                socket, 
+                dest, 
+                buffer, 
+                partial,
+                ft, 
+                flen}).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-export([start_link/3, go/2, send/2, close/1, headers/1, path/1]).
+
+%%
+
+start_link(Path, Headers, Destination) ->
+    gen_server:start_link(?MODULE, [Path, Headers, Destination], []).
+
+go(Pid, Socket) ->
+    ok = gen_tcp:controlling_process(Socket, Pid),
+    gen_server:cast(Pid, {go, Socket}).
+
+send(Pid, Msg) ->
+    io:format("send:~s~n",[Msg]),
+    gen_server:call(Pid, {send, Msg}).
+
+close(Pid) ->
+    gen_server:call(Pid, close).
+
+headers(Pid) ->
+    gen_server:call(Pid, headers).
+
+path(Pid) ->
+    gen_server:call(Pid, path).
+
+%%
+
+init([Path, Headers, Dest]) ->
+    process_flag(trap_exit, true),
+    {ok, #state{path=Path, 
+                legacy=true,
+                headers=Headers, 
+                dest=Dest, 
+                ft = undefined,
+                buffer = <<>>,
+                partial= <<>>
+               }}.
+
+handle_call(close, _From, State) ->
+    mochiweb_socket:close(State#state.socket),
+    {reply, ok, State};    
+handle_call(headers, _From, State) ->
+    {reply, State#state.headers, State};
+handle_call(path, _From, State) ->
+    {reply, State#state.path, State};
+handle_call({send, Msg}, _From, State = #state{legacy=false, socket=Socket}) ->
+    % header is 0xFF then 64bit big-endian int of the msg length
+    Len = iolist_size(Msg),
+    R = mochiweb_socket:send(Socket, [255, <<Len:64/unsigned-integer>>, Msg]), 
+    {reply, R, State};
+handle_call({send, Msg}, _From, State = #state{legacy=true, socket=Socket}) ->
+    % legacy spec, msgs are framed with 0x00..0xFF
+    R = mochiweb_socket:send(Socket, [0, Msg, 255]),
+    {reply, R, State}.
+
+handle_cast({go, Socket}, State) ->
+    mochiweb_socket:setopts(Socket, [{active, true}]),    
+    {noreply, State#state{socket=Socket}}.
+
+handle_info({'EXIT', _, _}, State) ->
+    io:format("TRAP EXIT~n",[]),
+    State#state.dest ! closed,
+    {stop, normal, State};    
+handle_info({tcp_closed, Sock}, State = #state{socket=Sock}) ->
+    State#state.dest ! closed,
+    {stop, normal, State};
+handle_info({tcp_error, Sock, Reason}, State = #state{socket=Sock}) ->
+    State#state.dest ! {error, Reason},
+    {stop, normal, State};
+handle_info({tcp, Sock, Data}, State = #state{socket=Sock, buffer=Buffer}) ->
+    %mochiweb_socket:setopts(Sock, [{active, once}]),
+    NewState = process_data(State#state{buffer= <<Buffer/binary,Data/binary>>}),
+    {noreply, NewState}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%% Internal functions
+
+process_data(State = #state{buffer= <<>>}) -> 
+    %io:format("A 0~n", []),
+    State;
+
+process_data(State = #state{buffer= <<FrameType:8,Buffer/binary>>, ft=undefined}) ->
+    %io:format("A 1~n", []),
+    process_data(State#state{buffer=Buffer, ft=FrameType, partial= <<>>});
+
+% "Legacy" frames, 0x00...0xFF
+% or modern closing handshake 0x00{8}
+process_data(State = #state{buffer= <<0,0,0,0,0,0,0,0, Buffer/binary>>, ft=0}) ->
+    %io:format("A 2~n", []),
+    State#state.dest ! closing_handshake,
+    process_data(State#state{buffer=Buffer, ft=undefined});
+
+process_data(State = #state{buffer= <<255, Rest/binary>>, ft=0}) ->
+    %io:format("A 3~n", []),
+    State#state.dest ! {legacy_frame, State#state.partial},
+    process_data(State#state{partial= <<>>, ft=undefined, buffer=Rest});
+
+process_data(State = #state{buffer= <<Byte:8, Rest/binary>>, ft=0, partial=Partial}) ->
+    %io:format("A 4, byte=~p state:~p~n", [Byte,State]),
+    NewPartial = case Partial of <<>> -> <<Byte>> ; _ -> <<Partial/binary, <<Byte>>/binary>> end,
+    NewState = State#state{buffer=Rest, partial=NewPartial},
+   process_data(NewState);
+
+% "Modern" frames, starting with 0xFF, followed by 64 bit length
+process_data(State = #state{buffer= <<Len:64/unsigned-integer,Buffer/binary>>, ft=255, flen=undefined}) ->
+    %io:format("A 5~n", []),
+    BitsLen = Len*8,
+    case Buffer of
+        <<Frame:BitsLen/binary, Rest/binary>> ->            
+            State#state.dest ! {utf8_frame, Frame},
+            process_data(State#state{ft=undefined, flen=undefined, buffer=Rest});
+
+        _ ->
+            State#state{flen=Len, buffer=Buffer}
+    end;
+
+process_data(State = #state{buffer=Buffer, ft=255, flen=Len}) when is_integer(Len) ->
+    %io:format("A 6~n", []),
+    BitsLen = Len*8,
+    case Buffer of
+        <<Frame:BitsLen/binary, Rest/binary>> ->            
+            State#state.dest ! {utf8_frame, Frame},
+            process_data(State#state{ft=undefined, flen=undefined, buffer=Rest});
+
+        _ ->
+            State#state{flen=Len, buffer=Buffer}
+    end.