You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2023/06/26 20:31:07 UTC

[couchdb] branch changes-websocket created (now 63df98f83)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch changes-websocket
in repository https://gitbox.apache.org/repos/asf/couchdb.git


      at 63df98f83 support Websocket protocol for continuous response

This branch includes the following new commits:

     new 9aed5feb1 WIP websocket support for changes feed
     new 63df98f83 support Websocket protocol for continuous response

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 02/02: support Websocket protocol for continuous response

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch changes-websocket
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 63df98f8386db02137aed5a3e07394b6500fa971
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jun 26 21:29:08 2023 +0100

    support Websocket protocol for continuous response
    
    I currently send a data frame as the heartbeat but we should
    tweak mochiweb_websocket so we can send pong messages which looks
    like this;
    
    diff --git a/src/mochiweb_websocket.erl b/src/mochiweb_websocket.erl
    index 22ef924..396db82 100644
    --- a/src/mochiweb_websocket.erl
    +++ b/src/mochiweb_websocket.erl
    @@ -74,6 +74,9 @@ call_body({M, F}, Payload, State, ReplyChannel) ->
     call_body(Body, Payload, State, ReplyChannel) ->
         Body(Payload, State, ReplyChannel).
    
    +send(Socket, pong, hybi) ->
    +    Payload = <<1:1, 0:3, 10:4, 1:4, 0:8>>,
    +    mochiweb_socket:send(Socket, Payload);
     send(Socket, Payload, hybi) ->
         Prefix = <<1:1, 0:3, 1:4,
                   (payload_length(iolist_size(Payload)))/binary>>,
    
    and then change;
    
    ReplyChannel("\n"),
    
    to
    
    ReplyChannel(pong),
---
 src/chttpd/src/chttpd_db.erl | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 791ee86f3..05ca6d95a 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -166,7 +166,7 @@ handle_changes_req1(#httpd{} = Req, Db) ->
 % websocket goop
 changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
     {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
-        Acc#cacc.mochi, fun client_ws_loop/3),
+        Acc#cacc.mochi#httpd.mochi_req, fun client_ws_loop/3),
     {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
 changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
     chttpd_stats:incr_rows(),
@@ -175,6 +175,8 @@ changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChann
 changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
     ReplyChannel("\n"),
     {ok, Acc};
+changes_callback(waiting_for_updates, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    {ok, Acc};
 changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
     Row =
         {[
@@ -182,7 +184,7 @@ changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, Rep
             {<<"pending">>, Pending}
         ]},
     ReplyChannel(?JSON_ENCODE(Row)),
-    {stop, Acc};
+    {ok, Acc};
 
 % callbacks for continuous feed (newline-delimited JSON Objects)
 changes_callback(start, #cacc{feed = continuous} = Acc) ->


[couchdb] 01/02: WIP websocket support for changes feed

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch changes-websocket
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9aed5feb198a3f87306eee101409cdd6ee9c2ac4
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jun 26 20:28:20 2023 +0100

    WIP websocket support for changes feed
---
 src/chttpd/src/chttpd_db.erl | 34 ++++++++++++++++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index e2de301b2..791ee86f3 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -66,7 +66,8 @@
     chunks_sent = 0,
     buffer = [],
     bufsize = 0,
-    threshold
+    threshold,
+    websocket
 }).
 
 -define(IS_ALL_DOCS(T),
@@ -149,7 +150,8 @@ handle_changes_req1(#httpd{} = Req, Db) ->
             Acc0 = #cacc{
                 feed = list_to_atom(Feed),
                 mochi = Req,
-                threshold = Max
+                threshold = Max,
+                websocket = websocket_upgrade_requested(Req)
             },
             try
                 fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
@@ -161,6 +163,27 @@ handle_changes_req1(#httpd{} = Req, Db) ->
             throw({bad_request, Msg})
     end.
 
+% websocket goop
+changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
+    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
+        Acc#cacc.mochi, fun client_ws_loop/3),
+    {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
+changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    chttpd_stats:incr_rows(),
+    ReplyChannel(?JSON_ENCODE(Change)),
+    {ok, Acc};
+changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    ReplyChannel("\n"),
+    {ok, Acc};
+changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    Row =
+        {[
+            {<<"last_seq">>, EndSeq},
+            {<<"pending">>, Pending}
+        ]},
+    ReplyChannel(?JSON_ENCODE(Row)),
+    {stop, Acc};
+
 % callbacks for continuous feed (newline-delimited JSON Objects)
 changes_callback(start, #cacc{feed = continuous} = Acc) ->
     {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
@@ -2557,6 +2580,13 @@ bulk_get_rev_error(all) ->
 bulk_get_rev_error([{Pos, RevId} = Rev]) when is_integer(Pos), is_binary(RevId) ->
     couch_doc:rev_to_str(Rev).
 
+websocket_upgrade_requested(#httpd{} = Req) ->
+    Upgrade = chttpd:header_value(Req, "Upgrade"),
+    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
+client_ws_loop(_Payload, Broadcaster, _ReplyChannel) ->
+    Broadcaster.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").