You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2023/12/04 18:47:39 UTC
(couchdb) 01/01: refactor `get_changes_row`
This is an automated email from the ASF dual-hosted git repository.
jiahuili430 pushed a commit to branch fix-changes-stats
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e9539806c184cb4f3f7bf2848c8f306b65a0fef9
Author: Jiahui Li <li...@gmail.com>
AuthorDate: Sat Dec 2 20:23:37 2023 -0600
refactor `get_changes_row`
---
src/couch/src/couch_changes.erl | 3 +-
src/couch/test/eunit/couch_changes_tests.erl | 172 ++++++++++++++++++++-------
src/fabric/src/fabric_rpc.erl | 96 +++++++--------
3 files changed, 178 insertions(+), 93 deletions(-)
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index e20e86b70..cd6d55eb7 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -19,7 +19,6 @@
wait_updated/3,
get_rest_updated/1,
configure_filter/4,
- filter/3,
filter/4,
handle_db_event/3,
handle_view_event/3,
@@ -283,7 +282,7 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs) when Includ
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
{Docs, filter_revs(Passes, Docs)};
filter(Db, DocInfo, Filter, _IncludeDocs) ->
- filter(Db, DocInfo, Filter).
+ {[], filter(Db, DocInfo, Filter)}.
get_view_qs({json_req, {Props}}) ->
{Query} = couch_util:get_value(<<"query">>, Props, {[]}),
diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl
index 293f49e2e..654858507 100644
--- a/src/couch/test/eunit/couch_changes_tests.erl
+++ b/src/couch/test/eunit/couch_changes_tests.erl
@@ -24,45 +24,6 @@
doc = nil
}).
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = create_db(DbName),
- Revs = [
- R
- || {ok, R} <- [
- save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
- save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
- save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
- save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
- save_doc(Db, {[{<<"_id">>, <<"doc5">>}]})
- ]
- ],
- Rev = lists:nth(3, Revs),
-
- {ok, Db1} = couch_db:reopen(Db),
- {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}),
- Revs1 = Revs ++ [Rev1],
- Revs2 =
- Revs1 ++
- [
- R
- || {ok, R} <- [
- save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}),
- save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}),
- save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}),
- save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]})
- ]
- ],
- config:set(
- "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false
- ),
- {DbName, list_to_tuple(Revs2)}.
-
-teardown({DbName, _}) ->
- config:delete("native_query_servers", "erlang", _Persist = false),
- delete_db(DbName),
- ok.
-
changes_feed_test_() ->
{
"Changes feed", {
@@ -90,6 +51,7 @@ changes_filter_test_() ->
fun test_util:start_couch/0,
fun test_util:stop_couch/1,
[
+ filter_by_custom_function(),
filter_by_design(),
filter_by_doc_id(),
filter_by_filter_function(),
@@ -120,6 +82,17 @@ changes_style_test_() ->
}
}.
+filter_by_custom_function() ->
+ {
+ "Filter function",
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [?TDEF_FE(t_receive_heartbeats, ?TIMEOUT div 1000)]
+ }
+ }.
+
filter_by_design() ->
{
"Filter _design",
@@ -259,6 +232,75 @@ t_end_changes_when_db_deleted({DbName, _Revs}) ->
{_Rows, _LastSeq} = wait_finished(Consumer),
ok = stop_consumer(Consumer).
+t_receive_heartbeats(_) ->
+ DbName = ?tempdb(),
+ Timeout = 100,
+ {ok, Db} = create_db(DbName),
+
+ {ok, _} = save_doc(
+ Db,
+ {[
+ {<<"_id">>, <<"_design/filtered">>},
+ {<<"language">>, <<"javascript">>},
+ {<<"filters">>,
+ {[
+ {<<"foo">>, <<
+ "function(doc) {\n"
+ " return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}"
+ >>}
+ ]}}
+ ]}
+ ),
+
+ ChangesArgs = #changes_args{
+ filter = "filtered/foo",
+ feed = "continuous",
+ timeout = 10000,
+ heartbeat = 1000
+ },
+ Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}),
+
+ {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}),
+
+ Heartbeats = get_heartbeats(Consumer),
+ ?assert(Heartbeats > 0),
+
+ {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}),
+
+ Heartbeats2 = get_heartbeats(Consumer),
+ ?assert(Heartbeats2 > Heartbeats),
+
+ Rows = get_rows(Consumer),
+ ?assertEqual(3, length(Rows)),
+
+ {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}),
+ timer:sleep(Timeout),
+ {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}),
+ timer:sleep(Timeout),
+
+ Heartbeats3 = get_heartbeats(Consumer),
+ ?assert(Heartbeats3 > Heartbeats2).
+
t_emit_only_design_documents({DbName, Revs}) ->
ChArgs = #changes_args{filter = "_design"},
Req = {json_req, null},
@@ -460,7 +502,6 @@ t_select_with_continuous({DbName, Revs}) ->
?assertEqual(ok, wait_row_notifications(1)),
ok = pause(Consumer),
NewRows = get_rows(Consumer),
- ?debugVal(NewRows),
?assertMatch([#row{seq = _, id = <<"doc8">>, deleted = false}], NewRows),
?assertEqual([#row{seq = 12, id = <<"doc8">>, deleted = false}], NewRows).
@@ -627,6 +668,44 @@ t_style_all_docs_with_include_docs({DbName, Revs}) ->
).
%%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%%
+setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = create_db(DbName),
+ Revs = [
+ R
+ || {ok, R} <- [
+ save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc5">>}]})
+ ]
+ ],
+ Rev = lists:nth(3, Revs),
+ {ok, Db1} = couch_db:reopen(Db),
+ {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}),
+ Revs1 = Revs ++ [Rev1],
+ Revs2 =
+ Revs1 ++
+ [
+ R
+ || {ok, R} <- [
+ save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]})
+ ]
+ ],
+ config:set(
+ "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false
+ ),
+ {DbName, list_to_tuple(Revs2)}.
+
+teardown({DbName, _}) ->
+ config:delete("native_query_servers", "erlang", _Persist = false),
+ delete_db(DbName),
+ ok.
+
update_ddoc(DbName, DDoc) ->
{ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
{ok, _} = couch_db:update_doc(Db, DDoc, []),
@@ -646,6 +725,19 @@ save_doc(Db, Json) ->
{ok, Rev} = couch_db:update_doc(Db, Doc, []),
{ok, couch_doc:rev_to_str(Rev)}.
+get_heartbeats({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {get_heartbeats, Ref},
+ Resp =
+ receive
+ {hearthbeats, Ref, HeartBeats} ->
+ HeartBeats
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
get_rows({Consumer, _}) ->
Ref = make_ref(),
Consumer ! {get_rows, Ref},
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 42544ac40..413758818 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -539,76 +539,70 @@ changes_enumerator(DocInfo, Acc) ->
db = Db,
args = #changes_args{
include_docs = IncludeDocs,
- filter_fun = Filter
+ conflicts = Conflicts,
+ filter_fun = Filter,
+ doc_options = DocOptions
},
- pending = Pending
+ pending = Pending,
+ epochs = Epochs
} = Acc,
- #doc_info{high_seq = Seq} = DocInfo,
- RevsOrDocRevs = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs),
- % include_docs = false, call `filter/3`, use `couch_changes:open_revs/3` to read docs.
- % include_docs = true, call `filter/4`, it will return [revs] or {docs, revs}.
- % - {}: use `couch_changes:open_revs/3` to open docs
- % - []: use `fabric_rpc:doc_member/3` to open docs
+ Opts =
+ case Conflicts of
+ true -> [conflicts | DocOptions];
+ false -> DocOptions
+ end,
+ Seq = DocInfo#doc_info.high_seq,
+ {Docs, Revs} = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs),
+ Changes = [X || X <- Revs, X /= null],
ChangesRow =
- case is_tuple(RevsOrDocRevs) of
- true ->
- {Docs, Revs} = RevsOrDocRevs,
- get_changes_row(Revs, Acc, DocInfo, fun get_json_docs/3, Docs);
- false ->
- get_changes_row(RevsOrDocRevs, Acc, DocInfo, fun doc_member/3, {Db, DocInfo})
+ case {Changes, Docs, IncludeDocs} of
+ {[], _, _} ->
+ {no_pass, [
+ {pending, Pending - 1},
+ {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
+ ]};
+ {_, _, false} ->
+ get_changes_row(Changes, [], DocInfo, Acc);
+ {_, [], true} ->
+ % Open docs in `fabric_rpc:doc_member/4`
+ Docs1 = [doc_member(Db, DocInfo, Opts, Filter)],
+ get_changes_row(Changes, Docs1, DocInfo, Acc);
+ {_, [Doc], true} ->
+ % Open docs in `couch_changes:open_revs/3`, and
+ % stored in [Doc], so call `get_json_doc/3` directly
+ Docs1 = [get_json_doc(Doc, Opts, Filter)],
+ get_changes_row(Changes, Docs1, DocInfo, Acc)
end,
ok = rexi:stream2(ChangesRow),
{ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}.
-get_changes_row(Revs, Acc, DocInfo, Fun, Args) ->
+get_changes_row(Changes, Docs, DocInfo, Acc) ->
#fabric_changes_acc{
db = Db,
- args = #changes_args{
- include_docs = IncludeDocs,
- conflicts = Conflicts,
- filter_fun = Filter,
- doc_options = DocOptions
- },
pending = Pending,
epochs = Epochs
} = Acc,
#doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo,
- case [X || X <- Revs, X /= null] of
- [] ->
- {no_pass, [
- {pending, Pending - 1},
- {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
- ]};
- Results ->
- Opts =
- if
- Conflicts -> [conflicts | DocOptions];
- true -> DocOptions
- end,
- {change, [
- {pending, Pending - 1},
- {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
- {id, Id},
- {changes, Results},
- {deleted, Del}
- | if
- IncludeDocs -> [Fun(Args, Opts, Filter)];
- true -> []
- end
- ]}
- end.
-
-get_json_docs([Doc], Opts, Filter) ->
- {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}.
-
-doc_member({Shard, DocInfo}, Opts, Filter) ->
+ {change, [
+ {pending, Pending - 1},
+ {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
+ {id, Id},
+ {changes, Changes},
+ {deleted, Del}
+ | Docs
+ ]}.
+
+doc_member(Shard, DocInfo, Opts, Filter) ->
case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
{ok, Doc} ->
- {doc, maybe_filtered_json_doc(Doc, Opts, Filter)};
+ get_json_doc(Doc, Opts, Filter);
Error ->
Error
end.
+get_json_doc(Doc, Opts, Filter) ->
+ {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}.
+
maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) when
Fields =/= nil
->