You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ii...@apache.org on 2019/10/07 20:36:18 UTC
[couchdb] branch master updated: Return headers from _changes feed
when there are no changes
This is an automated email from the ASF dual-hosted git repository.
iilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/master by this push:
new abe586e Return headers from _changes feed when there are no changes
new 6382374 Merge pull request #2240 from cloudant/issue/985-continious-feed-blocking
abe586e is described below
commit abe586e04c6a78f7abffe6afcefbadb39ff94c2a
Author: ILYA Khlopotov <ii...@apache.org>
AuthorDate: Mon Oct 7 10:35:55 2019 +0000
Return headers from _changes feed when there are no changes
Problem
-------
The request of continious _changes feed doesn't return until either:
- new change is made to the database
- the heartbeat interval is reached
This causes clients to block on subscription call.
Solution
--------
Introduce a counter to account for number of chunks sent.
Send '\n' exactly once on `waiting_for_updates` when `chunks_sent`
is still 0.
The implementation is suggested by @davisp [here](https://github.com/apache/couchdb/issues/985#issuecomment-537150907).
There is only one difference from his proposal which is:
```
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index aba1bd22f..9cd6944d2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -215,7 +215,7 @@ changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
true ->
{ok, Acc};
false ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp, []),
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
{ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
end;
changes_callback(waiting_for_updates, Acc) ->
```
---
src/chttpd/src/chttpd_db.erl | 33 +++++++++++++++++++-------
src/chttpd/test/eunit/chttpd_db_test.erl | 40 ++++++++++++++++++++++++--------
2 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c6404b0..aba1bd2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -46,6 +46,7 @@
mochi,
prepend = "",
responding = false,
+ chunks_sent = 0,
buffer = [],
bufsize = 0,
threshold
@@ -170,10 +171,10 @@ changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc)
Len = iolist_size(Chunk),
maybe_flush_changes_feed(Acc, Chunk, Len);
changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
- #cacc{mochi = Resp} = Acc,
+ #cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc,
Chunk = "event: heartbeat\ndata: \n\n",
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
- {ok, Acc#cacc{mochi = Resp1}};
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}};
changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
@@ -209,14 +210,27 @@ changes_callback({stop, EndSeq, Pending}, Acc) ->
chttpd:end_delayed_json_response(Resp1);
changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
- {ok, Acc};
+ #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+ case ChunksSent > 0 of
+ true ->
+ {ok, Acc};
+ false ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
+ end;
changes_callback(waiting_for_updates, Acc) ->
- #cacc{buffer = Buf, mochi = Resp} = Acc,
+ #cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
- {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}};
+ {ok, Acc#cacc{
+ buffer = [],
+ bufsize = 0,
+ mochi = Resp1,
+ chunks_sent = ChunksSent + 1
+ }};
changes_callback(timeout, Acc) ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
- {ok, Acc#cacc{mochi = Resp1}};
+ #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}};
changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
@@ -232,11 +246,12 @@ maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
{ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
{ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}};
maybe_flush_changes_feed(Acc0, Data, Len) ->
- #cacc{buffer = Buf, bufsize = Size} = Acc0,
+ #cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0,
Acc = Acc0#cacc{
prepend = ",\r\n",
buffer = [Buf | Data],
- bufsize = Size + Len
+ bufsize = Size + Len,
+ chunks_sent = ChunksSent + 1
},
{ok, Acc}.
diff --git a/src/chttpd/test/eunit/chttpd_db_test.erl b/src/chttpd/test/eunit/chttpd_db_test.erl
index c819bdf..204332d 100644
--- a/src/chttpd/test/eunit/chttpd_db_test.erl
+++ b/src/chttpd/test/eunit/chttpd_db_test.erl
@@ -65,6 +65,7 @@ all_test_() ->
fun should_return_ok_true_on_ensure_full_commit/1,
fun should_return_404_for_ensure_full_commit_on_no_db/1,
fun should_accept_live_as_an_alias_for_continuous/1,
+ fun should_return_headers_after_starting_continious/1,
fun should_return_404_for_delete_att_on_notadoc/1,
fun should_return_409_for_del_att_without_rev/1,
fun should_return_200_for_del_att_with_rev/1,
@@ -125,10 +126,8 @@ should_return_404_for_ensure_full_commit_on_no_db(Url0) ->
should_accept_live_as_an_alias_for_continuous(Url) ->
- GetLastSeq = fun(Bin) ->
- Parts = binary:split(Bin, <<"\n">>, [global]),
- Filtered = [P || P <- Parts, size(P) > 0],
- LastSeqBin = lists:last(Filtered),
+ GetLastSeq = fun(Chunks) ->
+ LastSeqBin = lists:last(Chunks),
{Result} = try ?JSON_DECODE(LastSeqBin) of
Data -> Data
catch
@@ -138,14 +137,11 @@ should_accept_live_as_an_alias_for_continuous(Url) ->
couch_util:get_value(<<"last_seq">>, Result, undefined)
end,
{timeout, ?TIMEOUT, ?_test(begin
- {ok, _, _, ResultBody1} =
- test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
- LastSeq1 = GetLastSeq(ResultBody1),
+ LastSeq1 = GetLastSeq(wait_non_empty_chunk(Url)),
{ok, _, _, _} = create_doc(Url, "testdoc2"),
- {ok, _, _, ResultBody2} =
- test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
- LastSeq2 = GetLastSeq(ResultBody2),
+
+ LastSeq2 = GetLastSeq(wait_non_empty_chunk(Url)),
?assertNotEqual(LastSeq1, LastSeq2)
end)}.
@@ -460,3 +456,27 @@ should_succeed_on_local_docs_with_multiple_queries(Url) ->
{InnerJson2} = lists:nth(2, ResultJsonBody),
?assertEqual(5, length(couch_util:get_value(<<"rows">>, InnerJson2)))
end)}.
+
+
+should_return_headers_after_starting_continious(Url) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ {ok, _, _, Bin} =
+ test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+ Parts = binary:split(Bin, <<"\n">>, [global]),
+ %% we should receive at least one part even when timeout=1
+ ?assertNotEqual([], Parts)
+ end)}.
+
+wait_non_empty_chunk(Url) ->
+ test_util:wait(fun() ->
+ {ok, _, _, Bin} =
+ test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+ Parts = binary:split(Bin, <<"\n">>, [global]),
+
+ case [P || P <- Parts, size(P) > 0] of
+ [] -> wait;
+ Chunks -> Chunks
+ end
+ end).