You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2022/10/08 07:01:19 UTC
[couchdb] 01/01: wip optimized bulk-get
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch optimized-fabric-open-revs
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6dcc9115718af7d6e0c519379b8f26f8a68f8ef3
Author: Nick Vatamaniuc <va...@gmail.com>
AuthorDate: Sat Oct 8 02:44:27 2022 -0400
wip optimized bulk-get
---
src/fabric/src/fabric.erl | 17 ++-
src/fabric/src/fabric_open_revs.erl | 224 ++++++++++++++++++++++++++++++++++++
2 files changed, 240 insertions(+), 1 deletion(-)
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 6d779d584..e61521da0 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -38,7 +38,7 @@
% Documents
-export([
open_doc/3,
- open_revs/4,
+ open_revs/3, open_revs/4,
get_doc_info/3,
get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3,
@@ -272,6 +272,21 @@ open_doc(DbName, Id, Options) ->
{error, {invalid_option, {doc_info, Else}}}
end.
+%% @doc retrieve a collection of revisions for a batch of docs
+-spec open_revs(dbname(), [{{docid(), [revision()] | all}, option()}], [option()]) ->
+ {ok, [[{ok, #doc{}} | {{not_found, missing}, revision()}]]}
+ | {timeout, any()}
+ | {error, any()}
+ | {error, any(), any()}.
+open_revs(DbName, IdRevsOpts0, Options) ->
+ IdRevsOpts = lists:map(
+ fun({{Id, Revs}, DocOpts}) ->
+ {{docid(Id), Revs}, DocOpts}
+ end,
+ IdRevsOpts0
+ ),
+ fabric_open_revs:go(dbname(DbName), IdRevsOpts, opts(Options)).
+
%% @doc retrieve a collection of revisions, possible all
-spec open_revs(dbname(), docid(), [revision()] | all, [option()]) ->
{ok, [{ok, #doc{}} | {{not_found, missing}, revision()}]}
diff --git a/src/fabric/src/fabric_open_revs.erl b/src/fabric/src/fabric_open_revs.erl
new file mode 100644
index 000000000..bb7414e92
--- /dev/null
+++ b/src/fabric/src/fabric_open_revs.erl
@@ -0,0 +1,224 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_open_revs).
+
+-export([
+ go/3
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(req, {
+ idrevs,
+ wcnt = 0,
+ rcnt = 0,
+ responses = []
+}).
+
+-record(st, {
+ r,
+ args,
+ reqs,
+ workers
+}).
+
+go(DbName, IdsRevsOpts, Options) ->
+ DefaultR = integer_to_list(mem3:quorum(DbName)),
+ R = list_to_integer(couch_util:get_value(r, Options, DefaultR)),
+ {ArgRefs, Reqs0} = build_req_map(IdsRevsOpts),
+ ShardMap = build_shard_map(DbName, Reqs0),
+ {Workers, Reqs} = spawn_workers(Reqs0, ShardMap, Options),
+ WShards = maps:keys(Workers),
+ RexiMon = fabric_util:create_monitors(WShards),
+ St = #st{
+ r = R,
+ args = ArgRefs,
+ reqs = Reqs,
+ workers = Workers
+ },
+ try fabric_util:recv(WShards, #shard.ref, fun handle_message/3, St) of
+ {timeout, #st{workers = #{} = Workers1}} ->
+ WShards1 = maps:keys(Workers1),
+ % Transform to a fabric_dict to reuse log_timeout/2
+ DefunctWorkers = fabric_dict:init(WShards1, nil),
+ fabric_util:log_timeout(DefunctWorkers, "open_revs"),
+ {error, timeout};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message([_ | _] = Resps, Worker, #st{} = St) ->
+ #st{workers = Workers, reqs = Reqs, r = R} = St,
+ {ArgsRefs, Workers1} = maps:take(Worker, Workers),
+ ArgsResps = lists:zip(ArgsRefs, Resps),
+ Reqs1 = lists:foldl(fun responses_fold/2, Reqs, ArgsResps),
+ case not r_met(Reqs1, R) andalso map_size(Workers1) > 0 of
+ true ->
+ {ok, St#st{workers = Workers1, reqs = Reqs1}};
+ false ->
+ fabric_ring:stop_workers(maps:keys(Workers1)),
+ {stop, finalize_all(St#st.args, Reqs1)}
+ end;
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #st{} = St) ->
+ #st{workers = Workers, reqs = Reqs} = St,
+ FilterFun = fun(#shard{node = N}) -> N =:= NodeRef end,
+ DeadKeys = lists:filter(FilterFun, maps:keys(Workers)),
+ Workers1 = maps:without(DeadKeys, Workers),
+ DeadWorkers = maps:with(DeadKeys, Workers),
+ FoldFun = fun(_, ArgRefs, Acc) -> update_wcnt(Acc, ArgRefs, -1) end,
+ Reqs1 = maps:fold(FoldFun, Reqs, DeadWorkers),
+ Error = {error, {nodedown, <<"progress not possible">>}},
+ handle_error(Error, St#st{workers = Workers1, reqs = Reqs1});
+handle_message({rexi_EXIT, Reason}, Worker, #st{} = St) ->
+ handle_message(Reason, Worker, St);
+handle_message({error, Reason}, Worker, #st{} = St) ->
+ handle_message(Reason, Worker, St);
+handle_message(Reason, Worker, #st{} = St) ->
+ #st{workers = Workers, reqs = Reqs} = St,
+ {DeadArgRefs, Workers1} = maps:take(Worker, Workers),
+ Reqs1 = update_wcnt(Reqs, DeadArgRefs, -1),
+ handle_error(Reason, St#st{workers = Workers1, reqs = Reqs1}).
+
+responses_fold({ArgRef, Resp}, #{} = Reqs) ->
+ #{ArgRef := Req} = Reqs,
+ #req{rcnt = RCnt, wcnt = WCnt, responses = Resps} = Req,
+ Reqs#{
+ ArgRef => Req#req{
+ rcnt = RCnt + 1,
+ wcnt = WCnt - 1,
+ responses = lists:umerge(Resps, lists:usort(Resp))
+ }
+ }.
+
+handle_error(Error, #st{workers = Workers, reqs = Reqs} = St) ->
+ case progress_possible(Reqs) of
+ true ->
+ {ok, St};
+ false ->
+ fabric_ring:stop_workers(maps:keys(Workers)),
+ {error, Error}
+ end.
+
+% Build map of #{ref => {{Id, Revs}, DocOpts}} arg tuples. Return both the map
+% and references in original argument order. DocOpts can hold huge {atts_since,
+% ...} lists so it makes sense to operate with a bit more compact
+% tags/references here.
+%
+build_req_map(IdsRevsOpts) ->
+ FoldFun = fun(IdRevsOpts, #{} = ReqMap) ->
+ ArgRef = make_ref(),
+ {ArgRef, ReqMap#{ArgRef => #req{idrevs = IdRevsOpts}}}
+ end,
+ lists:mapfoldr(FoldFun, #{}, IdsRevsOpts).
+
+% Build a #{#shard{} => [ArgRef1, ArgRef2, ...]} map. This structure will be
+% used for launching workers and then matching up response with the original
+% args.
+%
+build_shard_map(DbName, #{} = Reqs) ->
+ FoldReqsFun = fun(ArgRef, #req{idrevs = IdRevs}, #{} = WAcc) ->
+ {{DocId, _}, _} = IdRevs,
+ Shards = mem3:shards(DbName, DocId),
+ FoldShardsFun = fun(Shard, #{} = WAccInner) ->
+ UpdateFun = fun(ArgRefs) -> [ArgRef | ArgRefs] end,
+ maps:update_with(Shard, UpdateFun, [], WAccInner)
+ end,
+ lists:foldl(FoldShardsFun, WAcc, Shards)
+ end,
+ maps:fold(FoldReqsFun, #{}, Reqs).
+
+spawn_workers(#{} = Reqs, #{} = ShardMap, Options) ->
+ maps:fold(
+ fun(#shard{} = Shard, ArgRefs, {WorkersAcc, ReqsAcc}) ->
+ Worker = rexi_cast(Shard, ArgRefs, ReqsAcc, Options),
+ WorkersAcc1 = WorkersAcc#{Worker => ArgRefs},
+ ReqsAcc1 = update_wcnt(ReqsAcc, ArgRefs, 1),
+ {WorkersAcc1, ReqsAcc1}
+ end,
+ {#{}, Reqs},
+ ShardMap
+ ).
+
+% Spawn a worker and return an updated #shard{} with worker ref
+% Args are fetched from the Reqs map based on the ArgRef tag
+%
+rexi_cast(#shard{} = Shard, ArgRefs, #{} = Reqs, Options) ->
+ MapFun = fun(ArgRef) when is_reference(ArgRef) ->
+ #{ArgRef := #req{idrevs = IdRevs}} = Reqs,
+ IdRevs
+ end,
+ Args = lists:map(MapFun, ArgRefs),
+ RexiArgs = {fabric_rpc, open_revs, [Shard#shard.name, Args, Options]},
+ WRef = rexi:cast(Shard#shard.node, RexiArgs),
+ Shard#shard{ref = WRef}.
+
+% Update worker count for each of the #req{} records. Value may be positive
+% or negative, which could be used to decrement worker counts
+%
+update_wcnt(#{} = Reqs, ArgRefs, Val) when is_integer(Val) ->
+ lists:foldl(
+ fun(Ref, #{} = Acc) ->
+ #{Ref := #req{wcnt = WCnt} = Req} = Acc,
+ Acc#{Ref => Req#req{wcnt = WCnt + Val}}
+ end,
+ Reqs,
+ ArgRefs
+ ).
+
+progress_possible(#{} = Reqs) ->
+ maps:fold(
+ fun(_, #req{wcnt = WCnt, rcnt = RCnt}, Acc) ->
+ Acc andalso WCnt + RCnt > 0
+ end,
+ true,
+ Reqs
+ ).
+
+r_met(#{} = Reqs, R) ->
+ MinR = maps:fold(
+ fun(_, #req{rcnt = RCnt}, Acc) ->
+ min(RCnt, Acc)
+ end,
+ R,
+ Reqs
+ ),
+ MinR >= R.
+
+finalize_all(ArgRefs, #{} = Reqs) ->
+ lists:map(
+ fun(ArgRef) ->
+ #{ArgRef := #req{responses = Resps}} = Reqs,
+ finalize_req(Resps)
+ end,
+ ArgRefs
+ ).
+
+finalize_req(DocRevs) ->
+ Paths = lists:map(fun rev_to_path/1, DocRevs),
+ RevTree = couch_key_tree:multi_merge([], Paths),
+ TreeLeafs = couch_key_tree:get_all_leafs(RevTree),
+ % latest / all can return [] but exact ones get not_found,missing?
+ lists:map(fun path_to_reply/1, TreeLeafs).
+
+path_to_reply({?REV_MISSING, {Pos, [Rev]}}) ->
+ {{not_found, missing}, {Pos, Rev}};
+path_to_reply({#doc{} = Doc, _}) ->
+ {ok, Doc}.
+
+rev_to_path({ok, #doc{} = Doc}) ->
+ couch_doc:to_path(Doc);
+rev_to_path({{not_found, missing}, {Pos, Rev}}) ->
+ {Pos, {Rev, ?REV_MISSING, []}}.