You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/08 07:01:19 UTC

[couchdb] 01/01: wip optimized bulk-get

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch optimized-fabric-open-revs
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6dcc9115718af7d6e0c519379b8f26f8a68f8ef3
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Sat Oct 8 02:44:27 2022 -0400

    wip optimized bulk-get
---
 src/fabric/src/fabric.erl           |  17 ++-
 src/fabric/src/fabric_open_revs.erl | 224 ++++++++++++++++++++++++++++++++++++
 2 files changed, 240 insertions(+), 1 deletion(-)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 6d779d584..e61521da0 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -38,7 +38,7 @@
 % Documents
 -export([
     open_doc/3,
-    open_revs/4,
+    open_revs/3, open_revs/4,
     get_doc_info/3,
     get_full_doc_info/3,
     get_missing_revs/2, get_missing_revs/3,
@@ -272,6 +272,21 @@ open_doc(DbName, Id, Options) ->
             {error, {invalid_option, {doc_info, Else}}}
     end.
 
+%% @doc retrieve a collection of revisions for a batch of docs
+-spec open_revs(dbname(), [{{docid(), [revision()] | all}, option()}], [option()]) ->
+    {ok, [[{ok, #doc{}} | {{not_found, missing}, revision()}]]}
+    | {timeout, any()}
+    | {error, any()}
+    | {error, any(), any()}.
+open_revs(DbName, IdRevsOpts0, Options) ->
+    IdRevsOpts = lists:map(
+        fun({{Id, Revs}, DocOpts}) ->
+            {{docid(Id), Revs}, DocOpts}
+        end,
+        IdRevsOpts0
+    ),
+    fabric_open_revs:go(dbname(DbName), IdRevsOpts, opts(Options)).
+
 %% @doc retrieve a collection of revisions, possible all
 -spec open_revs(dbname(), docid(), [revision()] | all, [option()]) ->
     {ok, [{ok, #doc{}} | {{not_found, missing}, revision()}]}
diff --git a/src/fabric/src/fabric_open_revs.erl b/src/fabric/src/fabric_open_revs.erl
new file mode 100644
index 000000000..bb7414e92
--- /dev/null
+++ b/src/fabric/src/fabric_open_revs.erl
@@ -0,0 +1,224 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_open_revs).
+
+-export([
+    go/3
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(req, {
+    idrevs,
+    wcnt = 0,
+    rcnt = 0,
+    responses = []
+}).
+
+-record(st, {
+    r,
+    args,
+    reqs,
+    workers
+}).
+
+go(DbName, IdsRevsOpts, Options) ->
+    DefaultR = integer_to_list(mem3:quorum(DbName)),
+    R = list_to_integer(couch_util:get_value(r, Options, DefaultR)),
+    {ArgRefs, Reqs0} = build_req_map(IdsRevsOpts),
+    ShardMap = build_shard_map(DbName, Reqs0),
+    {Workers, Reqs} = spawn_workers(Reqs0, ShardMap, Options),
+    WShards = maps:keys(Workers),
+    RexiMon = fabric_util:create_monitors(WShards),
+    St = #st{
+        r = R,
+        args = ArgRefs,
+        reqs = Reqs,
+        workers = Workers
+    },
+    try fabric_util:recv(WShards, #shard.ref, fun handle_message/3, St) of
+        {timeout, #st{workers = #{} = Workers1}} ->
+            WShards1 = maps:keys(Workers1),
+            % Transform to a fabric_dict to reuse log_timeout/2
+            DefunctWorkers = fabric_dict:init(WShards1, nil),
+            fabric_util:log_timeout(DefunctWorkers, "open_revs"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+handle_message([_ | _] = Resps, Worker, #st{} = St) ->
+    #st{workers = Workers, reqs = Reqs, r = R} = St,
+    {ArgsRefs, Workers1} = maps:take(Worker, Workers),
+    ArgsResps = lists:zip(ArgsRefs, Resps),
+    Reqs1 = lists:foldl(fun responses_fold/2, Reqs, ArgsResps),
+    case not r_met(Reqs1, R) andalso map_size(Workers1) > 0 of
+        true ->
+            {ok, St#st{workers = Workers1, reqs = Reqs1}};
+        false ->
+            fabric_ring:stop_workers(maps:keys(Workers1)),
+            {stop, finalize_all(St#st.args, Reqs1)}
+    end;
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #st{} = St) ->
+    #st{workers = Workers, reqs = Reqs} = St,
+    FilterFun = fun(#shard{node = N}) -> N =:= NodeRef end,
+    DeadKeys = lists:filter(FilterFun, maps:keys(Workers)),
+    Workers1 = maps:without(DeadKeys, Workers),
+    DeadWorkers = maps:with(DeadKeys, Workers),
+    FoldFun = fun(_, ArgRefs, Acc) -> update_wcnt(Acc, ArgRefs, -1) end,
+    Reqs1 = maps:fold(FoldFun, Reqs, DeadWorkers),
+    Error = {error, {nodedown, <<"progress not possible">>}},
+    handle_error(Error, St#st{workers = Workers1, reqs = Reqs1});
+handle_message({rexi_EXIT, Reason}, Worker, #st{} = St) ->
+    handle_message(Reason, Worker, St);
+handle_message({error, Reason}, Worker, #st{} = St) ->
+    handle_message(Reason, Worker, St);
+handle_message(Reason, Worker, #st{} = St) ->
+    #st{workers = Workers, reqs = Reqs} = St,
+    {DeadArgRefs, Workers1} = maps:take(Worker, Workers),
+    Reqs1 = update_wcnt(Reqs, DeadArgRefs, -1),
+    handle_error(Reason, St#st{workers = Workers1, reqs = Reqs1}).
+
+responses_fold({ArgRef, Resp}, #{} = Reqs) ->
+    #{ArgRef := Req} = Reqs,
+    #req{rcnt = RCnt, wcnt = WCnt, responses = Resps} = Req,
+    Reqs#{
+        ArgRef => Req#req{
+            rcnt = RCnt + 1,
+            wcnt = WCnt - 1,
+            responses = lists:umerge(Resps, lists:usort(Resp))
+        }
+    }.
+
+handle_error(Error, #st{workers = Workers, reqs = Reqs} = St) ->
+    case progress_possible(Reqs) of
+        true ->
+            {ok, St};
+        false ->
+            fabric_ring:stop_workers(maps:keys(Workers)),
+            {error, Error}
+    end.
+
+% Build map of #{ref => {{Id, Revs}, DocOpts}} arg tuples. Return both the map
+% and references in original argument order. DocOpts can hold huge {atts_since,
+% ...} lists so it makes sense to operate with a bit more compact
+% tags/references here.
+%
+build_req_map(IdsRevsOpts) ->
+    FoldFun = fun(IdRevsOpts, #{} = ReqMap) ->
+        ArgRef = make_ref(),
+        {ArgRef, ReqMap#{ArgRef => #req{idrevs = IdRevsOpts}}}
+    end,
+    lists:mapfoldr(FoldFun, #{}, IdsRevsOpts).
+
+% Build a #{#shard{} => [ArgRef1, ArgRef2, ...]} map. This structure will be
+% used for launching workers and then matching up response with the original
+% args.
+%
+build_shard_map(DbName, #{} = Reqs) ->
+    FoldReqsFun = fun(ArgRef, #req{idrevs = IdRevs}, #{} = WAcc) ->
+        {{DocId, _}, _} = IdRevs,
+        Shards = mem3:shards(DbName, DocId),
+        FoldShardsFun = fun(Shard, #{} = WAccInner) ->
+            UpdateFun = fun(ArgRefs) -> [ArgRef | ArgRefs] end,
+            maps:update_with(Shard, UpdateFun, [], WAccInner)
+        end,
+        lists:foldl(FoldShardsFun, WAcc, Shards)
+    end,
+    maps:fold(FoldReqsFun, #{}, Reqs).
+
+spawn_workers(#{} = Reqs, #{} = ShardMap, Options) ->
+    maps:fold(
+        fun(#shard{} = Shard, ArgRefs, {WorkersAcc, ReqsAcc}) ->
+            Worker = rexi_cast(Shard, ArgRefs, ReqsAcc, Options),
+            WorkersAcc1 = WorkersAcc#{Worker => ArgRefs},
+            ReqsAcc1 = update_wcnt(ReqsAcc, ArgRefs, 1),
+            {WorkersAcc1, ReqsAcc1}
+        end,
+        {#{}, Reqs},
+        ShardMap
+    ).
+
+% Spawn a worker and return an updated #shard{} with worker ref
+% Args are fetched from the Reqs map based on the ArgRef tag
+%
+rexi_cast(#shard{} = Shard, ArgRefs, #{} = Reqs, Options) ->
+    MapFun = fun(ArgRef) when is_reference(ArgRef) ->
+        #{ArgRef := #req{idrevs = IdRevs}} = Reqs,
+        IdRevs
+    end,
+    Args = lists:map(MapFun, ArgRefs),
+    RexiArgs = {fabric_rpc, open_revs, [Shard#shard.name, Args, Options]},
+    WRef = rexi:cast(Shard#shard.node, RexiArgs),
+    Shard#shard{ref = WRef}.
+
+% Update worker count for each of the #req{} records. Value may be positive
+% or negative, which could be used to decrement worker counts
+%
+update_wcnt(#{} = Reqs, ArgRefs, Val) when is_integer(Val) ->
+    lists:foldl(
+        fun(Ref, #{} = Acc) ->
+            #{Ref := #req{wcnt = WCnt} = Req} = Acc,
+            Acc#{Ref => Req#req{wcnt = WCnt + Val}}
+        end,
+        Reqs,
+        ArgRefs
+    ).
+
+progress_possible(#{} = Reqs) ->
+    maps:fold(
+        fun(_, #req{wcnt = WCnt, rcnt = RCnt}, Acc) ->
+            Acc andalso WCnt + RCnt > 0
+        end,
+        true,
+        Reqs
+    ).
+
+r_met(#{} = Reqs, R) ->
+    MinR = maps:fold(
+        fun(_, #req{rcnt = RCnt}, Acc) ->
+            min(RCnt, Acc)
+        end,
+        R,
+        Reqs
+    ),
+    MinR >= R.
+
+finalize_all(ArgRefs, #{} = Reqs) ->
+    lists:map(
+        fun(ArgRef) ->
+            #{ArgRef := #req{responses = Resps}} = Reqs,
+            finalize_req(Resps)
+        end,
+        ArgRefs
+    ).
+
+finalize_req(DocRevs) ->
+    Paths = lists:map(fun rev_to_path/1, DocRevs),
+    RevTree = couch_key_tree:multi_merge([], Paths),
+    TreeLeafs = couch_key_tree:get_all_leafs(RevTree),
+    % latest / all can return [] but exact ones get not_found,missing?
+    lists:map(fun path_to_reply/1, TreeLeafs).
+
+path_to_reply({?REV_MISSING, {Pos, [Rev]}}) ->
+    {{not_found, missing}, {Pos, Rev}};
+path_to_reply({#doc{} = Doc, _}) ->
+    {ok, Doc}.
+
+rev_to_path({ok, #doc{} = Doc}) ->
+    couch_doc:to_path(Doc);
+rev_to_path({{not_found, missing}, {Pos, Rev}}) ->
+    {Pos, {Rev, ?REV_MISSING, []}}.