You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/07/23 20:13:22 UTC
[couchdb] 13/25: Move fdb logic to couch_views_fdb
This is an automated email from the ASF dual-hosted git repository.
davisp pushed a commit to branch prototype/views
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5d6068e004f1976191c9327a97d9b33c452e64f3
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Jul 18 13:58:57 2019 -0500
Move fdb logic to couch_views_fdb
---
src/couch_views/src/couch_views.erl | 45 +-----
src/couch_views/src/couch_views_fdb.erl | 147 +++++++++++++++--
src/couch_views/src/couch_views_reader.erl | 250 ++++++++++++-----------------
3 files changed, 245 insertions(+), 197 deletions(-)
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 65af1bf..7deb54d 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -43,23 +43,22 @@ query(Db, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
false -> ok
end,
- Args = mrargs_to_map(QueryArgs2),
- ok = maybe_update_view(Db, Mrst, Args),
+ ok = maybe_update_view(Db, Mrst, QueryArgs2),
try
- couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args)
+ couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, QueryArgs2)
after
- UpdateAfter = maps:get(update, Args) == lazy,
+ UpdateAfter = QueryArgs2#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
couch_views_jobs:build_view_async(Db, Mrst)
end
end.
-maybe_update_view(_Db, _Mrst, #{update := false}) ->
+maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
ok;
-maybe_update_view(_Db, _Mrst, #{update := lazy}) ->
+maybe_update_view(_Db, _Mrst, #mrargs{update = laze}) ->
ok;
maybe_update_view(Db, Mrst, _Args) ->
@@ -83,39 +82,5 @@ is_reduce_view({Reduce, _, _}) ->
Reduce =:= red.
-mrargs_to_map(#mrargs{} = Args) ->
- process_args(#{
- start_key => Args#mrargs.start_key,
- start_key_docid => Args#mrargs.start_key_docid,
- end_key => Args#mrargs.end_key,
- end_key_docid => Args#mrargs.end_key_docid,
- keys => Args#mrargs.keys,
- direction => Args#mrargs.direction,
- limit => Args#mrargs.limit,
- skip => Args#mrargs.skip,
- update => Args#mrargs.update,
- multi_get => Args#mrargs.multi_get,
- inclusive_end => Args#mrargs.inclusive_end,
- include_docs => Args#mrargs.include_docs,
- doc_options => Args#mrargs.doc_options,
- update_seq => Args#mrargs.update_seq,
- conflicts => Args#mrargs.conflicts,
- sorted => Args#mrargs.sorted
- }).
-
-
-process_args(#{} = Args) ->
- Args1 = remove_ununsed_values(Args),
- Defaults = #{
- direction => fwd,
- inclusive_end => true,
- update => true,
- skip => 0,
- limit => ?MAX_VIEW_LIMIT
- },
-
- maps:merge(Defaults, Args1).
-
-
remove_ununsed_values(Args) ->
maps:filter(fun (_, V) -> V /= undefined end, Args).
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index f47f1b1..57ed5f1 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -16,7 +16,9 @@
get_update_seq/2,
set_update_seq/3,
- write_rows/4
+ fold_map_idx/5,
+
+ write_doc/4
]).
@@ -54,18 +56,54 @@ set_view_seq(TxDb, Sig, Seq) ->
ok = erlfdb:set(Tx, SeqKey, Seq).
+fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
+ {Fun, Acc} = case fabric2_util:get_value(dir, Options, fwd) of
+ fwd ->
+ FwdAcc = #{
+ prefix => MapIdxPrefix,
+ next => key,
+ key => undefined,
+ sort_key => undefined,
+ docid => undefined,
+ dupe_id => undefined,
+ callback => Callback,
+ acc => Acc0,
+ },
+ {fun fold_fwd/2, FwdAcc}
+ rev ->
+ RevAcc #{
+ prefix => MapIdxPrefix,
+ next => value,
+ value => undefined,
+ sort_key => undefined,
+ docid => undefined,
+ dupe_id => undefined,
+ callback => Callback,
+ acc => Acc0
+ },
+ {fun fold_rev/2, RevAcc}
+ end,
+
+ fabric2_db:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options).
+
+
write_doc(TxDb, Sig, #{deleted := true} = Doc, ViewIds) ->
#{
id := DocId
} = Doc,
- ViewKeys = get_view_keys(TxDb, Sig, DocId),
+ ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
- clear_id_idx(TxDb, Sig, DocId),
+ sclear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, ViewKeys}) ->
clear_map_idx(TxDb, Sig, ViewId, ViewKeys)
- end, ViewKeys).
-
+ end, ExistingViewKeys);
write_doc(TxDb, Sig, Doc, ViewIds) ->
#{
@@ -79,7 +117,7 @@ write_doc(TxDb, Sig, Doc, ViewIds) ->
ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
- ok = clear_id_idx(TxDb, Sig, DocId),
+ clear_id_idx(TxDb, Sig, DocId),
lists:foreach(fun({ViewId, NewRows}) ->
ExistingKeys = fabric2_util:get_value(ViewId, ExistingViewKeys, []),
@@ -88,6 +126,95 @@ write_doc(TxDb, Sig, Doc, ViewIds) ->
end, lists:zip(ViewIds, Results)).
+fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+ #{
+ prefix := Prefix
+ } = Acc,
+
+ {{SortKey, DocId}, _DupeId, ?VIEW_ROW_KEY} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+ Acc#{
+ next := val,
+ key := couch_views_encoding:decode(EncodedOriginalKey),
+ sort_key := SortKey,
+ docid := DocId,
+ dupe_id := DupeId
+ };
+
+fold_fwd({RowKey, EncodedValue}, #{next := val} = Acc) ->
+ #{
+ prefix := Prefix,
+ key := Key,
+ sort_key := SortKey,
+ docid := DocId,
+ dupe_id := DocId,
+ callback := UserCallback
+ acc := UserAcc0
+ } = Acc,
+
+ % We're asserting there that this row is paired
+ % correctly with the previous row by relying on
+ % a badmatch if any of these values don't match.
+ {{SortKey, DocId}, DupeId, ?VIEW_ROW_VAL} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+
+ Value = couch_views_encoding:decode(EncodedValue),
+ NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+
+ #{
+ next := key,
+ key := undefined,
+ sort_key := undefined,
+ docid := undefined,
+ dupe_id := undefined,
+ acc := UserAcc1
+ }.
+
+
+fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) ->
+ #{
+ prefix := Prefix
+ } = Acc,
+
+ {{SortKey, DocId}, _DupeId, ?VIEW_ROW_VAL} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+ Acc#{
+ next := key,
+ value := couch_views_encoding:decode(EncodedValue),
+ sort_key := SortKey,
+ docid := DocId,
+ dupe_id := DupeId
+ };
+
+fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+ #{
+ prefix := Prefix,
+ value := Value,
+ sort_key := SortKey,
+ docid := DocId,
+ dupe_id := DocId,
+ callback := UserCallback
+ acc := UserAcc0
+ } = Acc,
+
+ % We're asserting there that this row is paired
+ % correctly with the previous row by relying on
+ % a badmatch if any of these values don't match.
+ {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} =
+ erlfdb_tuple:unpack(RowKey, Prefix),
+
+ Key = couch_views_encoding:decode(EncodedOriginalKey),
+ NewAcc = UserCallback(DocId, Key, Value, UserAcc0),
+
+ #{
+ next := val,
+ value := undefined,
+ sort_key := undefined,
+ docid := undefined,
+ dupe_id := undefined,
+ acc := UserAcc1
+ }.
+
clear_id_idx(TxDb, Sig, DocId) ->
#{
tx := Tx,
@@ -141,10 +268,10 @@ update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
lists:foreach(fun({DupeId, Key1, Key2, Val}) ->
- KeyKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_KEY),
- ValKey = map_idx_key(MapIdxPrefix, Key1, DocId, DupeId, ?VIEW_ROW_VAL),
- ok = erlfdn:store(Tx, KeyKey, Key2),
- ok = erlfdb:store(Tx, ValKey, Val)
+ KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY),
+ VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VAL),
+ ok = erlfdn:store(Tx, KK, Key2),
+ ok = erlfdb:store(Tx, VK, Val)
end, KVsToAdd).
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index f4e768a..56b23f2 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -22,174 +22,130 @@
-include_lib("fabric/src/fabric2.hrl").
-read(Db, DDoc, ViewName, Callback, Acc0, Args) ->
- #{name := DbName} = Db,
-
- {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
#mrst{
sig = Sig,
views = Views
} = Mrst,
- IdxName = get_idx_name(ViewName, Views),
- State0 = #{
- acc => Acc0,
- skip => maps:get(skip, Args, 0),
- include_docs => maps:get(include_docs, Args, false),
- db => Db
- },
+ ViewId = get_view_id(ViewName, Views),
+ Opts = mrargs_to_fdb_options(Args),
+ Fun = fun handle_row/4,
+
+ try
+ % Need to add total_rows support
+ Meta = {meta, [{total_rows, null}, {offset, null}]},
+ UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Acc0 = #{
+ db => TxDb
+ skip => Args#mrargs.skip,
+ mrargs => Args,
+ callback => UserCallback,
+ acc => UserAcc1
+ },
+
+ Acc1 = couch_views_fdb:fold_map_idx(
+ TxDb,
+ Sig,
+ ViewId,
+ Opts,
+ Fun,
+ Acc0
+ ),
+
+ #{
+ acc := UserAcc2
+ } = Acc1,
+
+ maybe_stop(Callback(complete, UserAcc2)
+ end)
+ catch throw:{done, Out} ->
+ {ok, Out}
+ end.
- DefaultOpts = [{streaming_mode, want_all}],
- {Start, End, QueryOpts} = convert_args_to_fdb(Db, Sig, IdxName, Args,
- DefaultOpts),
- Opts = QueryOpts ++ DefaultOpts,
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- Future = couch_views_fdb:get_map_range(TxDb, Start, End, Opts),
+handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+ {ok, Acc#{skip := Skip - 1}};
- UnPack = get_unpack_fun(TxDb, Opts, Callback),
- State1 = lists:foldl(UnPack, State0, erlfdb:wait(Future)),
+handle_row(DocId, Key, Value, Acc) ->
+ #{
+ db := TxDb
+ mrargs := Args,
+ callback := UserCallback,
+ acc := UserAcc
+ } = Acc,
+
+ BaseRow = [
+ {id, DocId},
+ {key, Key},
+ {value, Value}
+ ],
+
+ Row = BaseRow ++ if not IncludeDocs -> []; true ->
+ DocOpts0 = Args#mrargs.doc_options,
+ DocOpts1 = OpenOpts0 ++ case Args#mrargs.conflicts of
+ true -> [conflicts];
+ false -> []
+ end,
+ DocObj = case fabric2_db:open_doc(Db, DocId, DocOpts1) of
+ {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts1);
+ {not_found, _} -> null
+ end,
+ [{doc, DocObj}]
+ end,
- #{acc := Acc1} = State1,
- Callback(complete, Acc1)
- end).
+ UserAcc1 = maybe_stop(Callback({row, Row1}, UserAcc0)),
+ Acc#{acc := UserAcc1}
-get_idx_name(ViewName, Views) ->
- {value, View} = lists:search(fun (View) ->
+get_view_id(ViewName, Views) ->
+ {value, View} = lists:search(fun(View) ->
lists:member(ViewName, View#mrview.map_names)
end, Views),
View#mrview.id_num.
-convert_args_to_fdb(Db, Sig, IdxName, Args, Opts) ->
- #{
- direction := Direction
- } = Args,
-
- {Start1, End1} = get_range_keys(Db, Sig, IdxName, Args),
-
- Opts1 = case maps:is_key(limit, Args) of
- false ->
- Opts;
- true ->
- Skip = maps:get(skip, Args, 0),
- Limit = maps:get(limit, Args),
- % Limit is multiplied by two because there are two rows per key
- % value.
- % Skip is added because that is done in the fold so we need
- % to fetch the number of documents
- % along with the docs we would skip.
- % Limit = (Doc limit + Skip) * Num of Rows per Map KV
- [{limit, (Limit + Skip) * 2} | Opts]
- end,
-
- Opts2 = case Direction of
- fwd ->
- Opts1;
- rev ->
- [{reverse, true} | Opts1]
- end,
- {Start1, End1, Opts2}.
-
-
-get_range_keys(Db, Sig, IdxName, Args) ->
- #{
- inclusive_end := InclusiveEnd,
- direction := Direction
+mrargs_to_fdb_options(Args) ->
+ #mrargs{
+ start_key = StartKey,
+ start_key_docid = StartKeyDocId,
+ end_key = EndKey,
+ end_key_docid = EndKeyDocId,
+ direction = Direction,
+ limit = Limit,
+ inclusive_end = InclusiveEnd
} = Args,
- {MapStartKey, MapEndKey} = case Direction of
- fwd -> {start_key, end_key};
- rev -> {end_key, start_key}
+ StartKeyOpts = case {StartKey, StartKeyDocId} of
+ {undefined, _} ->
+ [];
+ {StartKey, undefined} ->
+ [{start_key, {StartKey}}];
+ {_, _} ->
+ [{start_key, {StartKey, StartKeyDocId}}]
end,
- {Start0, End0} = couch_views_fdb:get_map_range_keys(Db, Sig, IdxName),
-
- Start1 = case maps:is_key(MapStartKey, Args) of
- false ->
- Start0;
- true ->
- StartKey = maps:get(MapStartKey, Args),
- Start = couch_views_fdb:get_map_index_key(Db, Sig, IdxName,
- StartKey),
- erlfdb_key:first_greater_or_equal(Start)
- end,
-
- End1 = case maps:is_key(MapEndKey, Args) of
- false ->
- End0;
- true ->
- EndKey = maps:get(MapEndKey, Args),
- EndBin = couch_views_fdb:get_map_index_key(Db, Sig, IdxName,
- EndKey),
- EndBin1 = case InclusiveEnd of
- true -> <<EndBin/binary, 16#FF>>;
- false -> EndBin
- end,
- erlfdb_key:first_greater_than(EndBin1)
- end,
- {Start1, End1}.
-
-
-get_unpack_fun(TxDb, Opts, Callback) ->
- UnPackFwd = fun({K, V}, State) ->
- case couch_views_fdb:unpack_map_row(TxDb, K, V) of
- {key, _Id, RowKey} ->
- State#{current_key => RowKey};
- {value, Id, RowValue} ->
- #{
- current_key := RowKey
- } = State,
- process_map_row(Id, RowKey, RowValue, State, Callback)
- end
+ EndKeyOpts = case {EndKey, EndKeyDocId} of
+ {undefined, _} ->
+ [];
+ {EndKey, undefined} when InclusiveEnd ->
+ [{end_key, {EndKey}}];
+ {EndKey, undefined} ->
+ [{end_key_gt, {EndKey}}];
+ {EndKey, EndKeyDocId} when InclusiveEnd ->
+ [{end_key, {EndKey, EndKeyDocId}}];
+ {EndKey, EndKeyDocId} ->
+ [{end_key_gt, {EndKey, EndKeyDocId}}]
end,
- UnPackRev = fun({K, V}, State) ->
- case couch_views_fdb:unpack_map_row(TxDb, K, V) of
- {key, Id, RowKey} ->
- #{
- current_value := RowValue
- } = State,
- process_map_row(Id, RowKey, RowValue, State, Callback);
- {value, _Id, RowValue} ->
- State#{current_value => RowValue}
- end
- end,
-
- case lists:keyfind(reverse, 1, Opts) of
- {reverse, true} -> UnPackRev;
- _ -> UnPackFwd
- end.
+ [
+ {dir, Direction},
+ {limit, Limit * 2},
+ {streaming_mode, want_all}
+ ] ++ StartKeyOpts ++ EndKeyOpts.
-process_map_row(Id, RowKey, RowValue, State, Callback) ->
- #{
- acc := Acc,
- skip := Skip,
- db := Db
- } = State,
-
- case Skip > 0 of
- true ->
- State#{skip := Skip -1};
- false ->
- Row = [{id, Id}, {key, RowKey}, {value, RowValue}],
-
- IncludeDoc = maps:get(include_docs, State, false),
- Row1 = maybe_include_doc(Db, Id, Row, IncludeDoc),
-
- {ok, AccNext} = Callback({row, Row1}, Acc),
- State#{acc := AccNext}
- end.
-
-
-maybe_include_doc(_Db, _Id, Row, false) ->
- Row;
-
-maybe_include_doc(Db, Id, Row, true) ->
- Doc1 = case fabric2_db:open_doc(Db, Id) of
- {ok, Doc} -> couch_doc:to_json_obj(Doc, []);
- {not_found, _} -> []
- end,
- Row ++ [{doc, Doc1}].
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).