You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ii...@apache.org on 2019/10/07 20:36:18 UTC

[couchdb] branch master updated: Return headers from _changes feed when there are no changes

This is an automated email from the ASF dual-hosted git repository.

iilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new abe586e  Return headers from _changes feed when there are no changes
     new 6382374  Merge pull request #2240 from cloudant/issue/985-continious-feed-blocking
abe586e is described below

commit abe586e04c6a78f7abffe6afcefbadb39ff94c2a
Author: ILYA Khlopotov <ii...@apache.org>
AuthorDate: Mon Oct 7 10:35:55 2019 +0000

    Return headers from _changes feed when there are no changes
    
    Problem
    -------
    
    The request of continious _changes feed doesn't return until either:
    - new change is made to the database
    - the heartbeat interval is reached
    
    This causes clients to block on subscription call.
    
    Solution
    --------
    
    Introduce a counter to account for number of chunks sent.
    Send '\n' exactly once on `waiting_for_updates` when `chunks_sent`
    is still 0.
    
    The implementation is suggested by @davisp [here](https://github.com/apache/couchdb/issues/985#issuecomment-537150907).
    There is only one difference from his proposal which is:
    ```
    diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
    index aba1bd22f..9cd6944d2 100644
    --- a/src/chttpd/src/chttpd_db.erl
    +++ b/src/chttpd/src/chttpd_db.erl
    @@ -215,7 +215,7 @@ changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
             true ->
                 {ok, Acc};
             false ->
    -            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, []),
    +            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
                 {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
         end;
     changes_callback(waiting_for_updates, Acc) ->
    ```
---
 src/chttpd/src/chttpd_db.erl             | 33 +++++++++++++++++++-------
 src/chttpd/test/eunit/chttpd_db_test.erl | 40 ++++++++++++++++++++++++--------
 2 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c6404b0..aba1bd2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -46,6 +46,7 @@
     mochi,
     prepend = "",
     responding = false,
+    chunks_sent = 0,
     buffer = [],
     bufsize = 0,
     threshold
@@ -170,10 +171,10 @@ changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc)
     Len = iolist_size(Chunk),
     maybe_flush_changes_feed(Acc, Chunk, Len);
 changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
-    #cacc{mochi = Resp} = Acc,
+    #cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc,
     Chunk = "event: heartbeat\ndata: \n\n",
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
-    {ok, Acc#cacc{mochi = Resp1}};
+    {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}};
 changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
     #cacc{mochi = Resp, buffer = Buf} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
@@ -209,14 +210,27 @@ changes_callback({stop, EndSeq, Pending}, Acc) ->
     chttpd:end_delayed_json_response(Resp1);
 
 changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
-    {ok, Acc};
+    #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+    case ChunksSent > 0 of
+        true ->
+            {ok, Acc};
+        false ->
+            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
+            {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
+    end;
 changes_callback(waiting_for_updates, Acc) ->
-    #cacc{buffer = Buf, mochi = Resp} = Acc,
+    #cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
-    {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}};
+    {ok, Acc#cacc{
+        buffer = [],
+        bufsize = 0,
+        mochi = Resp1,
+        chunks_sent = ChunksSent + 1
+    }};
 changes_callback(timeout, Acc) ->
-    {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
-    {ok, Acc#cacc{mochi = Resp1}};
+    #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
+    {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}};
 changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
     #cacc{mochi = Req} = Acc,
     chttpd:send_error(Req, Reason);
@@ -232,11 +246,12 @@ maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
     {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
     {ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}};
 maybe_flush_changes_feed(Acc0, Data, Len) ->
-    #cacc{buffer = Buf, bufsize = Size} = Acc0,
+    #cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0,
     Acc = Acc0#cacc{
         prepend = ",\r\n",
         buffer = [Buf | Data],
-        bufsize = Size + Len
+        bufsize = Size + Len,
+        chunks_sent = ChunksSent + 1
     },
     {ok, Acc}.
 
diff --git a/src/chttpd/test/eunit/chttpd_db_test.erl b/src/chttpd/test/eunit/chttpd_db_test.erl
index c819bdf..204332d 100644
--- a/src/chttpd/test/eunit/chttpd_db_test.erl
+++ b/src/chttpd/test/eunit/chttpd_db_test.erl
@@ -65,6 +65,7 @@ all_test_() ->
                     fun should_return_ok_true_on_ensure_full_commit/1,
                     fun should_return_404_for_ensure_full_commit_on_no_db/1,
                     fun should_accept_live_as_an_alias_for_continuous/1,
+                    fun should_return_headers_after_starting_continious/1,
                     fun should_return_404_for_delete_att_on_notadoc/1,
                     fun should_return_409_for_del_att_without_rev/1,
                     fun should_return_200_for_del_att_with_rev/1,
@@ -125,10 +126,8 @@ should_return_404_for_ensure_full_commit_on_no_db(Url0) ->
 
 
 should_accept_live_as_an_alias_for_continuous(Url) ->
-    GetLastSeq = fun(Bin) ->
-        Parts = binary:split(Bin, <<"\n">>, [global]),
-        Filtered = [P || P <- Parts, size(P) > 0],
-        LastSeqBin = lists:last(Filtered),
+    GetLastSeq = fun(Chunks) ->
+        LastSeqBin = lists:last(Chunks),
         {Result} = try ?JSON_DECODE(LastSeqBin) of
             Data -> Data
         catch
@@ -138,14 +137,11 @@ should_accept_live_as_an_alias_for_continuous(Url) ->
         couch_util:get_value(<<"last_seq">>, Result, undefined)
     end,
     {timeout, ?TIMEOUT, ?_test(begin
-        {ok, _, _, ResultBody1} =
-            test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
-        LastSeq1 = GetLastSeq(ResultBody1),
+        LastSeq1 = GetLastSeq(wait_non_empty_chunk(Url)),
 
         {ok, _, _, _} = create_doc(Url, "testdoc2"),
-        {ok, _, _, ResultBody2} =
-            test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
-        LastSeq2 = GetLastSeq(ResultBody2),
+
+        LastSeq2 = GetLastSeq(wait_non_empty_chunk(Url)),
 
         ?assertNotEqual(LastSeq1, LastSeq2)
     end)}.
@@ -460,3 +456,27 @@ should_succeed_on_local_docs_with_multiple_queries(Url) ->
         {InnerJson2} = lists:nth(2, ResultJsonBody),
         ?assertEqual(5, length(couch_util:get_value(<<"rows">>, InnerJson2)))
     end)}.
+
+
+should_return_headers_after_starting_continious(Url) ->
+    {timeout, ?TIMEOUT, ?_test(begin
+       {ok, _, _, Bin} =
+            test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+        Parts = binary:split(Bin, <<"\n">>, [global]),
+        %% we should receive at least one part even when timeout=1
+        ?assertNotEqual([], Parts)
+    end)}.
+
+wait_non_empty_chunk(Url) ->
+    test_util:wait(fun() ->
+        {ok, _, _, Bin} =
+            test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+        Parts = binary:split(Bin, <<"\n">>, [global]),
+
+        case [P || P <- Parts, size(P) > 0] of
+            [] -> wait;
+            Chunks -> Chunks
+        end
+    end).