You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2023/01/13 20:55:14 UTC

[couchdb] branch main updated: Ensure design docs are uploaded individually when replicating with _bulk_get

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new a14922fad Ensure design docs are uploaded individually when replicating with _bulk_get
a14922fad is described below

commit a14922fad42575574bb26a24a123e0a1ef4e1621
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Wed Jan 11 23:51:05 2023 -0500

    Ensure design docs are uploaded individually when replicating with _bulk_get
    
    Previously, when replication jobs used _bulk_get they didn't upload design
    docs individually like they do when not using _bulk_get.
    
    Here were are preserving an already existing behavior which we had in the
    replicator without _bulk_get usage for more than 2 years. It was introduced
    here in #2426. Related to these issues #2415 and #2413.
    
    Add tests to cover both attachments and ddoc cases. meck:num_calls/3 is helpful
    as it allows to nicely assert which API function was called and how many times.
---
 .../src/couch_replicator_worker.erl                | 22 ++++--
 .../test/eunit/couch_replicator_bulk_get_tests.erl | 79 +++++++++++++++++++++-
 2 files changed, 93 insertions(+), 8 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index d8f872388..46e4a6e94 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -297,17 +297,25 @@ queue_fetch_loop(#fetch_st{} = St) ->
         {changes, ChangesManager, Changes, ReportSeq} ->
             % Find missing revisions (POST to _revs_diff)
             {IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt),
-            {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt),
-            % Documents without attachments can be uploaded right away
-            BatchFun = fun({_, #doc{} = Doc}) ->
-                ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+            % Filter out and handle design docs individually
+            DDocFilter = fun
+                ({<<?DESIGN_DOC_PREFIX, _/binary>>, _Rev}, _PAs) -> true;
+                ({_Id, _Rev}, _PAs) -> false
             end,
-            lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
-            % Fetch individually if _bulk_get failed or there are attachments
+            DDocIdRevs = maps:filter(DDocFilter, IdRevs),
             FetchFun = fun({Id, Rev}, PAs) ->
                 ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
             end,
-            maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)),
+            maps:map(FetchFun, DDocIdRevs),
+            % IdRevs1 is all the docs without design docs. Bulk get those.
+            IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs),
+            {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent, BgSt),
+            BatchFun = fun({_, #doc{} = Doc}) ->
+                ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+            end,
+            lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
+            % Invidually upload docs with attachments.
+            maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)),
             {ok, Stats} = gen_server:call(Parent, flush, infinity),
             ok = report_seq_done(Cp, ReportSeq, Stats),
             couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
index 2ecd0f4ee..f0d9569db 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
@@ -26,7 +26,11 @@ bulk_get_test_() ->
             fun couch_replicator_test_helper:test_teardown/1,
             [
                 ?TDEF_FE(use_bulk_get),
+                ?TDEF_FE(use_bulk_get_with_ddocs),
+                ?TDEF_FE(use_bulk_get_with_attachments),
                 ?TDEF_FE(dont_use_bulk_get),
+                ?TDEF_FE(dont_use_bulk_get_ddocs),
+                ?TDEF_FE(dont_use_bulk_get_attachments),
                 ?TDEF_FE(job_enable_overrides_global_disable),
                 ?TDEF_FE(global_disable_works)
             ]
@@ -39,7 +43,33 @@ use_bulk_get({_Ctx, {Source, Target}}) ->
     replicate(Source, Target, true),
     BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
     JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
     ?assertEqual(0, JustGets),
+    ?assertEqual(0, DocUpdates),
+    ?assert(BulkGets >= 1),
+    compare_dbs(Source, Target).
+
+use_bulk_get_with_ddocs({_Ctx, {Source, Target}}) ->
+    populate_db_ddocs(Source, ?DOC_COUNT),
+    meck:new(couch_replicator_api_wrap, [passthrough]),
+    replicate(Source, Target, true),
+    BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+    JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+    ?assertEqual(?DOC_COUNT, JustGets),
+    ?assertEqual(?DOC_COUNT, DocUpdates),
+    ?assert(BulkGets >= 1),
+    compare_dbs(Source, Target).
+
+use_bulk_get_with_attachments({_Ctx, {Source, Target}}) ->
+    populate_db_atts(Source, ?DOC_COUNT),
+    meck:new(couch_replicator_api_wrap, [passthrough]),
+    replicate(Source, Target, true),
+    BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+    JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+    ?assertEqual(?DOC_COUNT, JustGets),
+    ?assertEqual(?DOC_COUNT, DocUpdates),
     ?assert(BulkGets >= 1),
     compare_dbs(Source, Target).
 
@@ -49,10 +79,36 @@ dont_use_bulk_get({_Ctx, {Source, Target}}) ->
     replicate(Source, Target, false),
     BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
     JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
     ?assertEqual(0, BulkGets),
+    ?assertEqual(0, DocUpdates),
     ?assertEqual(?DOC_COUNT, JustGets),
     compare_dbs(Source, Target).
 
+dont_use_bulk_get_ddocs({_Ctx, {Source, Target}}) ->
+    populate_db_ddocs(Source, ?DOC_COUNT),
+    meck:new(couch_replicator_api_wrap, [passthrough]),
+    replicate(Source, Target, false),
+    BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+    JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+    ?assertEqual(0, BulkGets),
+    ?assertEqual(?DOC_COUNT, JustGets),
+    ?assertEqual(?DOC_COUNT, DocUpdates),
+    compare_dbs(Source, Target).
+
+dont_use_bulk_get_attachments({_Ctx, {Source, Target}}) ->
+    populate_db_atts(Source, ?DOC_COUNT),
+    meck:new(couch_replicator_api_wrap, [passthrough]),
+    replicate(Source, Target, false),
+    BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+    JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+    DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+    ?assertEqual(0, BulkGets),
+    ?assertEqual(?DOC_COUNT, JustGets),
+    ?assertEqual(?DOC_COUNT, DocUpdates),
+    compare_dbs(Source, Target).
+
 job_enable_overrides_global_disable({_Ctx, {Source, Target}}) ->
     populate_db(Source, ?DOC_COUNT),
     Persist = false,
@@ -78,10 +134,31 @@ global_disable_works({_Ctx, {Source, Target}}) ->
     compare_dbs(Source, Target).
 
 populate_db(DbName, DocCount) ->
-    Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end,
+    IdFun = fun(Id) -> integer_to_binary(Id) end,
+    Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
+    Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+    {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+
+populate_db_ddocs(DbName, DocCount) ->
+    IdFun = fun(Id) -> <<"_design/", (integer_to_binary(Id))/binary>> end,
+    Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
     Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
     {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
 
+populate_db_atts(DbName, DocCount) ->
+    IdFun = fun(Id) -> integer_to_binary(Id) end,
+    Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id), atts = [att(<<"a">>)]} | Acc] end,
+    Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+    {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+
+att(Name) when is_binary(Name) ->
+    couch_att:new([
+        {name, Name},
+        {att_len, 1},
+        {type, <<"app/binary">>},
+        {data, <<"x">>}
+    ]).
+
 compare_dbs(Source, Target) ->
     couch_replicator_test_helper:cluster_compare_dbs(Source, Target).