You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ji...@apache.org on 2023/12/04 18:47:39 UTC

(couchdb) 01/01: refactor `get_changes_row`

This is an automated email from the ASF dual-hosted git repository.

jiahuili430 pushed a commit to branch fix-changes-stats
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit e9539806c184cb4f3f7bf2848c8f306b65a0fef9
Author: Jiahui Li <li...@gmail.com>
AuthorDate: Sat Dec 2 20:23:37 2023 -0600

    refactor `get_changes_row`
---
 src/couch/src/couch_changes.erl              |   3 +-
 src/couch/test/eunit/couch_changes_tests.erl | 172 ++++++++++++++++++++-------
 src/fabric/src/fabric_rpc.erl                |  96 +++++++--------
 3 files changed, 178 insertions(+), 93 deletions(-)

diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index e20e86b70..cd6d55eb7 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -19,7 +19,6 @@
     wait_updated/3,
     get_rest_updated/1,
     configure_filter/4,
-    filter/3,
     filter/4,
     handle_db_event/3,
     handle_view_event/3,
@@ -283,7 +282,7 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}, IncludeDocs) when Includ
     {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
     {Docs, filter_revs(Passes, Docs)};
 filter(Db, DocInfo, Filter, _IncludeDocs) ->
-    filter(Db, DocInfo, Filter).
+    {[], filter(Db, DocInfo, Filter)}.
 
 get_view_qs({json_req, {Props}}) ->
     {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl
index 293f49e2e..654858507 100644
--- a/src/couch/test/eunit/couch_changes_tests.erl
+++ b/src/couch/test/eunit/couch_changes_tests.erl
@@ -24,45 +24,6 @@
     doc = nil
 }).
 
-setup() ->
-    DbName = ?tempdb(),
-    {ok, Db} = create_db(DbName),
-    Revs = [
-        R
-     || {ok, R} <- [
-            save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
-            save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
-            save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
-            save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
-            save_doc(Db, {[{<<"_id">>, <<"doc5">>}]})
-        ]
-    ],
-    Rev = lists:nth(3, Revs),
-
-    {ok, Db1} = couch_db:reopen(Db),
-    {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}),
-    Revs1 = Revs ++ [Rev1],
-    Revs2 =
-        Revs1 ++
-            [
-                R
-             || {ok, R} <- [
-                    save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}),
-                    save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}),
-                    save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}),
-                    save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]})
-                ]
-            ],
-    config:set(
-        "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false
-    ),
-    {DbName, list_to_tuple(Revs2)}.
-
-teardown({DbName, _}) ->
-    config:delete("native_query_servers", "erlang", _Persist = false),
-    delete_db(DbName),
-    ok.
-
 changes_feed_test_() ->
     {
         "Changes feed", {
@@ -90,6 +51,7 @@ changes_filter_test_() ->
             fun test_util:start_couch/0,
             fun test_util:stop_couch/1,
             [
+                filter_by_custom_function(),
                 filter_by_design(),
                 filter_by_doc_id(),
                 filter_by_filter_function(),
@@ -120,6 +82,17 @@ changes_style_test_() ->
         }
     }.
 
+filter_by_custom_function() ->
+    {
+        "Filter function",
+        {
+            foreach,
+            fun setup/0,
+            fun teardown/1,
+            [?TDEF_FE(t_receive_heartbeats, ?TIMEOUT div 1000)]
+        }
+    }.
+
 filter_by_design() ->
     {
         "Filter _design",
@@ -259,6 +232,75 @@ t_end_changes_when_db_deleted({DbName, _Revs}) ->
     {_Rows, _LastSeq} = wait_finished(Consumer),
     ok = stop_consumer(Consumer).
 
+t_receive_heartbeats(_) ->
+    DbName = ?tempdb(),
+    Timeout = 100,
+    {ok, Db} = create_db(DbName),
+
+    {ok, _} = save_doc(
+        Db,
+        {[
+            {<<"_id">>, <<"_design/filtered">>},
+            {<<"language">>, <<"javascript">>},
+            {<<"filters">>,
+                {[
+                    {<<"foo">>, <<
+                        "function(doc) {\n"
+                        "                              return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}"
+                    >>}
+                ]}}
+        ]}
+    ),
+
+    ChangesArgs = #changes_args{
+        filter = "filtered/foo",
+        feed = "continuous",
+        timeout = 10000,
+        heartbeat = 1000
+    },
+    Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}),
+
+    {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}),
+
+    Heartbeats = get_heartbeats(Consumer),
+    ?assert(Heartbeats > 0),
+
+    {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}),
+
+    Heartbeats2 = get_heartbeats(Consumer),
+    ?assert(Heartbeats2 > Heartbeats),
+
+    Rows = get_rows(Consumer),
+    ?assertEqual(3, length(Rows)),
+
+    {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}),
+    timer:sleep(Timeout),
+    {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}),
+    timer:sleep(Timeout),
+
+    Heartbeats3 = get_heartbeats(Consumer),
+    ?assert(Heartbeats3 > Heartbeats2).
+
 t_emit_only_design_documents({DbName, Revs}) ->
     ChArgs = #changes_args{filter = "_design"},
     Req = {json_req, null},
@@ -460,7 +502,6 @@ t_select_with_continuous({DbName, Revs}) ->
     ?assertEqual(ok, wait_row_notifications(1)),
     ok = pause(Consumer),
     NewRows = get_rows(Consumer),
-    ?debugVal(NewRows),
     ?assertMatch([#row{seq = _, id = <<"doc8">>, deleted = false}], NewRows),
     ?assertEqual([#row{seq = 12, id = <<"doc8">>, deleted = false}], NewRows).
 
@@ -627,6 +668,44 @@ t_style_all_docs_with_include_docs({DbName, Revs}) ->
     ).
 
 %%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%%
+setup() ->
+    DbName = ?tempdb(),
+    {ok, Db} = create_db(DbName),
+    Revs = [
+        R
+     || {ok, R} <- [
+            save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+            save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+            save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+            save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+            save_doc(Db, {[{<<"_id">>, <<"doc5">>}]})
+        ]
+    ],
+    Rev = lists:nth(3, Revs),
+    {ok, Db1} = couch_db:reopen(Db),
+    {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}),
+    Revs1 = Revs ++ [Rev1],
+    Revs2 =
+        Revs1 ++
+            [
+                R
+             || {ok, R} <- [
+                    save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}),
+                    save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}),
+                    save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}),
+                    save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]})
+                ]
+            ],
+    config:set(
+        "native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist = false
+    ),
+    {DbName, list_to_tuple(Revs2)}.
+
+teardown({DbName, _}) ->
+    config:delete("native_query_servers", "erlang", _Persist = false),
+    delete_db(DbName),
+    ok.
+
 update_ddoc(DbName, DDoc) ->
     {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
     {ok, _} = couch_db:update_doc(Db, DDoc, []),
@@ -646,6 +725,19 @@ save_doc(Db, Json) ->
     {ok, Rev} = couch_db:update_doc(Db, Doc, []),
     {ok, couch_doc:rev_to_str(Rev)}.
 
+get_heartbeats({Consumer, _}) ->
+    Ref = make_ref(),
+    Consumer ! {get_heartbeats, Ref},
+    Resp =
+        receive
+            {hearthbeats, Ref, HeartBeats} ->
+                HeartBeats
+        after ?TIMEOUT ->
+            timeout
+        end,
+    ?assertNotEqual(timeout, Resp),
+    Resp.
+
 get_rows({Consumer, _}) ->
     Ref = make_ref(),
     Consumer ! {get_rows, Ref},
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 42544ac40..413758818 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -539,76 +539,70 @@ changes_enumerator(DocInfo, Acc) ->
         db = Db,
         args = #changes_args{
             include_docs = IncludeDocs,
-            filter_fun = Filter
+            conflicts = Conflicts,
+            filter_fun = Filter,
+            doc_options = DocOptions
         },
-        pending = Pending
+        pending = Pending,
+        epochs = Epochs
     } = Acc,
-    #doc_info{high_seq = Seq} = DocInfo,
-    RevsOrDocRevs = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs),
-    % include_docs = false, call `filter/3`, use `couch_changes:open_revs/3` to read docs.
-    % include_docs = true, call `filter/4`, it will return [revs] or {docs, revs}.
-    %  - {}: use `couch_changes:open_revs/3` to open docs
-    %  - []: use `fabric_rpc:doc_member/3` to open docs
+    Opts =
+        case Conflicts of
+            true -> [conflicts | DocOptions];
+            false -> DocOptions
+        end,
+    Seq = DocInfo#doc_info.high_seq,
+    {Docs, Revs} = couch_changes:filter(Db, DocInfo, Filter, IncludeDocs),
+    Changes = [X || X <- Revs, X /= null],
     ChangesRow =
-        case is_tuple(RevsOrDocRevs) of
-            true ->
-                {Docs, Revs} = RevsOrDocRevs,
-                get_changes_row(Revs, Acc, DocInfo, fun get_json_docs/3, Docs);
-            false ->
-                get_changes_row(RevsOrDocRevs, Acc, DocInfo, fun doc_member/3, {Db, DocInfo})
+        case {Changes, Docs, IncludeDocs} of
+            {[], _, _} ->
+                {no_pass, [
+                    {pending, Pending - 1},
+                    {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
+                ]};
+            {_, _, false} ->
+                get_changes_row(Changes, [], DocInfo, Acc);
+            {_, [], true} ->
+                % Open docs in `fabric_rpc:doc_member/4`
+                Docs1 = [doc_member(Db, DocInfo, Opts, Filter)],
+                get_changes_row(Changes, Docs1, DocInfo, Acc);
+            {_, [Doc], true} ->
+                % Open docs in `couch_changes:open_revs/3`, and
+                % stored in [Doc], so call `get_json_doc/3` directly
+                Docs1 = [get_json_doc(Doc, Opts, Filter)],
+                get_changes_row(Changes, Docs1, DocInfo, Acc)
         end,
     ok = rexi:stream2(ChangesRow),
     {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending - 1}}.
 
-get_changes_row(Revs, Acc, DocInfo, Fun, Args) ->
+get_changes_row(Changes, Docs, DocInfo, Acc) ->
     #fabric_changes_acc{
         db = Db,
-        args = #changes_args{
-            include_docs = IncludeDocs,
-            conflicts = Conflicts,
-            filter_fun = Filter,
-            doc_options = DocOptions
-        },
         pending = Pending,
         epochs = Epochs
     } = Acc,
     #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DocInfo,
-    case [X || X <- Revs, X /= null] of
-        [] ->
-            {no_pass, [
-                {pending, Pending - 1},
-                {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}
-            ]};
-        Results ->
-            Opts =
-                if
-                    Conflicts -> [conflicts | DocOptions];
-                    true -> DocOptions
-                end,
-            {change, [
-                {pending, Pending - 1},
-                {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
-                {id, Id},
-                {changes, Results},
-                {deleted, Del}
-                | if
-                    IncludeDocs -> [Fun(Args, Opts, Filter)];
-                    true -> []
-                end
-            ]}
-    end.
-
-get_json_docs([Doc], Opts, Filter) ->
-    {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}.
-
-doc_member({Shard, DocInfo}, Opts, Filter) ->
+    {change, [
+        {pending, Pending - 1},
+        {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
+        {id, Id},
+        {changes, Changes},
+        {deleted, Del}
+        | Docs
+    ]}.
+
+doc_member(Shard, DocInfo, Opts, Filter) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
         {ok, Doc} ->
-            {doc, maybe_filtered_json_doc(Doc, Opts, Filter)};
+            get_json_doc(Doc, Opts, Filter);
         Error ->
             Error
     end.
 
+get_json_doc(Doc, Opts, Filter) ->
+    {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}.
+
 maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) when
     Fields =/= nil
 ->