You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/09/30 15:08:59 UTC

[couchdb] 05/08: Use ebtree for reduce functions

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 69f0ba1cc0af0c6008f63fb7342a60efa794634b
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jul 29 10:34:48 2020 -0500

    Use ebtree for reduce functions
---
 src/couch_views/src/couch_views.erl        |   6 -
 src/couch_views/src/couch_views_fdb.erl    |   1 -
 src/couch_views/src/couch_views_reader.erl | 159 ++++++++++++++++++++++-
 src/couch_views/src/couch_views_trees.erl  | 199 +++++++++++++++++++++++++----
 4 files changed, 327 insertions(+), 38 deletions(-)

diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index da8a142..2d91631 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -161,12 +161,6 @@ maybe_update_view(TxDb, Mrst, false, _Args) ->
     end.
 
 
-is_reduce_view(#mrargs{view_type = ViewType}) ->
-    ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
-    Reduce =:= red.
-
-
 to_mrargs(#mrargs{} = Args) ->
     Args;
 
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 28a60b8..b0fb82e 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -299,7 +299,6 @@ reset_interactive_index(Db, Sig, _St) ->
     {VS, ?INDEX_BUILDING}.
 
 
-
 version_key(Db, Sig) ->
     #{
         db_prefix := DbPrefix
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index a785c7b..3c58627 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -23,7 +23,15 @@
 -include_lib("fabric/include/fabric2.hrl").
 
 
-read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) ->
+    ReadFun = case Args of
+        #mrargs{view_type = map} -> fun read_map_view/6;
+        #mrargs{view_type = red} -> fun read_red_view/6
+    end,
+    ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args).
+
+
+read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
     try
         fabric2_fdb:transactional(Db, fun(TxDb) ->
             #mrst{
@@ -68,6 +76,79 @@ read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
     end.
 
 
+read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+    #mrst{
+        language = Lang,
+        views = Views
+    } = Mrst0,
+    {Idx, Lang, View0} = get_red_view(Lang, Args, ViewName, Views),
+    Mrst1 = Mrst0#mrst{views = [View0]},
+    ReadOpts = [{read_only, Idx}],
+    try
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+            #mrst{
+                language = Lang,
+                views = [View1]
+            } = Mrst = couch_views_trees:open(TxDb, Mrst1, ReadOpts),
+
+            #mrargs{
+                extra = Extra
+            } = Args,
+
+            Fun = fun handle_red_row/3,
+
+            Meta = get_red_meta(TxDb, Mrst, View1, Args),
+            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+            Finalizer = case couch_util:get_value(finalizer, Extra) of
+                undefined ->
+                    {_, FunSrc} = lists:nth(Idx, View1#mrview.reduce_funs),
+                    FunSrc;
+                CustomFun->
+                    CustomFun
+            end,
+
+            Acc0 = #{
+                db => TxDb,
+                skip => Args#mrargs.skip,
+                limit => Args#mrargs.limit,
+                mrargs => undefined,
+                finalizer => Finalizer,
+                red_idx => Idx,
+                language => Lang,
+                callback => UserCallback,
+                acc => UserAcc1
+            },
+
+            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+                Opts = mrargs_to_fdb_options(KeyArgs),
+                KeyAcc1 = KeyAcc0#{
+                    mrargs := KeyArgs
+                },
+                couch_views_trees:fold_red_idx(
+                        TxDb,
+                        View1,
+                        Idx,
+                        Opts,
+                        Fun,
+                        KeyAcc1
+                    )
+            end, Acc0, expand_keys_args(Args)),
+
+            #{
+                acc := UserAcc2
+            } = Acc1,
+            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+        end)
+    catch
+        throw:{complete, Out} ->
+            {_, Final} = UserCallback(complete, Out),
+            {ok, Final};
+        throw:{done, Out} ->
+            {ok, Out}
+    end.
+
+
 get_map_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) ->
     TotalRows = couch_views_trees:get_row_count(TxDb, View),
     ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
@@ -78,6 +159,14 @@ get_map_meta(TxDb, _Mrst, View, #mrargs{}) ->
     {meta, [{total, TotalRows}, {offset, null}]}.
 
 
+get_red_meta(TxDb, Mrst, _View, #mrargs{update_seq = true}) ->
+    ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+    {meta,  [{update_seq, ViewSeq}]};
+
+get_red_meta(_TxDb, _Mrst, _View, #mrargs{}) ->
+    {meta, []}.
+
+
 handle_map_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
     Acc#{skip := Skip - 1};
 
@@ -115,6 +204,38 @@ handle_map_row(DocId, Key, Value, Acc) ->
     Acc#{limit := Limit - 1, acc := UserAcc1}.
 
 
+handle_red_row(_Key, _Red, #{skip := Skip} = Acc) when Skip > 0 ->
+    Acc#{skip := Skip - 1};
+
+handle_red_row(_Key, _Value, #{limit := 0, acc := UserAcc}) ->
+    throw({complete, UserAcc});
+
+handle_red_row(Key0, Value0, Acc) ->
+    #{
+        limit := Limit,
+        finalizer := Finalizer,
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    Key1 = case Key0 of
+        undefined -> null;
+        _ -> Key0
+    end,
+    Value1 = maybe_finalize(Finalizer, Value0),
+    Row = [{key, Key1}, {value, Value1}],
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc#{limit := Limit - 1, acc := UserAcc1}.
+
+
+maybe_finalize(null, Red) ->
+    Red;
+maybe_finalize(Finalizer, Red) ->
+    {ok, Finalized} = couch_query_servers:finalize(Finalizer, Red),
+    Finalized.
+
+
 get_map_view(Lang, Args, ViewName, Views) ->
     case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
         {map, View, _Args} -> View;
@@ -122,6 +243,13 @@ get_map_view(Lang, Args, ViewName, Views) ->
     end.
 
 
+get_red_view(Lang, Args, ViewName, Views) ->
+    case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+        {red, {Idx, Lang, View}, _} -> {Idx, Lang, View};
+        _ -> throw({not_found, missing_named_view})
+    end.
+
+
 expand_keys_args(#mrargs{keys = undefined} = Args) ->
     [Args];
 
@@ -136,12 +264,14 @@ expand_keys_args(#mrargs{keys = Keys} = Args) ->
 
 mrargs_to_fdb_options(Args) ->
     #mrargs{
+        view_type = ViewType,
         start_key = StartKey,
         start_key_docid = StartKeyDocId,
         end_key = EndKey,
         end_key_docid = EndKeyDocId0,
         direction = Direction,
-        inclusive_end = InclusiveEnd
+        inclusive_end = InclusiveEnd,
+        group_level = GroupLevel
     } = Args,
 
     StartKeyOpts = if StartKey == undefined -> []; true ->
@@ -160,10 +290,33 @@ mrargs_to_fdb_options(Args) ->
         [{end_key, {EndKey, EndKeyDocId}}]
     end,
 
+    GroupFunOpt = make_group_key_fun(ViewType, GroupLevel),
+
     [
         {dir, Direction},
         {inclusive_end, InclusiveEnd}
-    ] ++ StartKeyOpts ++ EndKeyOpts.
+    ] ++ StartKeyOpts ++ EndKeyOpts ++ GroupFunOpt.
+
+
+make_group_key_fun(map, _) ->
+    [];
+
+make_group_key_fun(red, exact) ->
+    [
+        {group_key_fun, fun({Key, _DocId}) -> Key end}
+    ];
+
+make_group_key_fun(red, 0) ->
+    [
+        {group_key_fun, group_all}
+    ];
+
+make_group_key_fun(red, N) when is_integer(N), N > 0 ->
+    GKFun = fun
+        ({Key, _DocId}) when is_list(Key) -> lists:sublist(Key, N);
+        ({Key, _DocId}) -> Key
+    end,
+    [{group_key_fun, GKFun}].
 
 
 maybe_stop({ok, Acc}) -> Acc;
diff --git a/src/couch_views/src/couch_views_trees.erl b/src/couch_views/src/couch_views_trees.erl
index 7ce3505..b45750b 100644
--- a/src/couch_views/src/couch_views_trees.erl
+++ b/src/couch_views/src/couch_views_trees.erl
@@ -14,11 +14,13 @@
 
 -export([
     open/2,
+    open/3,
 
     get_row_count/2,
     get_kv_size/2,
 
     fold_map_idx/5,
+    fold_red_idx/6,
 
     update_views/3
 ]).
@@ -35,6 +37,10 @@
 
 
 open(TxDb, Mrst) ->
+    open(TxDb, Mrst, []).
+
+
+open(TxDb, Mrst, Options) ->
     #mrst{
         sig = Sig,
         language = Lang,
@@ -42,7 +48,7 @@ open(TxDb, Mrst) ->
     } = Mrst,
     Mrst#mrst{
         id_btree = open_id_tree(TxDb, Sig),
-        views = [open_view_tree(TxDb, Sig, Lang, V) || V <- Views]
+        views = [open_view_tree(TxDb, Sig, Lang, V, Options) || V <- Views]
     }.
 
 
@@ -50,7 +56,7 @@ get_row_count(TxDb, View) ->
     #{
         tx := Tx
     } = TxDb,
-    {Count, _} = ebtree:full_reduce(Tx, View#mrview.btree),
+    {Count, _, _} = ebtree:full_reduce(Tx, View#mrview.btree),
     Count.
 
 
@@ -58,7 +64,7 @@ get_kv_size(TxDb, View) ->
     #{
         tx := Tx
     } = TxDb,
-    {_, TotalSize} = ebtree:full_reduce(Tx, View#mrview.btree),
+    {_, TotalSize, _} = ebtree:full_reduce(Tx, View#mrview.btree),
     TotalSize.
 
 
@@ -122,6 +128,74 @@ fold_map_idx(TxDb, View, Options, Callback, Acc0) ->
     end.
 
 
+fold_red_idx(TxDb, View, Idx, Options, Callback, Acc0) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    #mrview{
+        btree = Btree
+    } = View,
+
+    {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun} = to_red_opts(Options),
+
+    Wrapper = fun({GroupKey, Reduction}, WAcc) ->
+        {_RowCount, _RowSize, UserReds} = Reduction,
+        RedValue = lists:nth(Idx, UserReds),
+        Callback(GroupKey, RedValue, WAcc)
+    end,
+
+    case {GroupKeyFun, Dir} of
+        {group_all, fwd} ->
+            EBtreeOpts = [
+                {dir, fwd},
+                {inclusive_end, InclusiveEnd}
+            ],
+            Reduction = ebtree:reduce(Tx, Btree, StartKey, EndKey, EBtreeOpts),
+            Wrapper({null, Reduction}, Acc0);
+        {F, fwd} when is_function(F) ->
+            EBtreeOpts = [
+                {dir, fwd},
+                {inclusive_end, InclusiveEnd}
+            ],
+            ebtree:group_reduce(
+                    Tx,
+                    Btree,
+                    StartKey,
+                    EndKey,
+                    GroupKeyFun,
+                    Wrapper,
+                    Acc0,
+                    EBtreeOpts
+                );
+        {group_all, rev} ->
+            % Start/End keys swapped on purpose because ebtree. Also
+            % inclusive_start for same reason.
+            EBtreeOpts = [
+                {dir, rev},
+                {inclusive_start, InclusiveEnd}
+            ],
+            Reduction = ebtree:reduce(Tx, Btree, EndKey, StartKey, EBtreeOpts),
+            Wrapper({null, Reduction}, Acc0);
+        {F, rev} when is_function(F) ->
+            % Start/End keys swapped on purpose because ebtree. Also
+            % inclusive_start for same reason.
+            EBtreeOpts = [
+                {dir, rev},
+                {inclusive_start, InclusiveEnd}
+            ],
+            ebtree:group_reduce(
+                    Tx,
+                    Btree,
+                    EndKey,
+                    StartKey,
+                    GroupKeyFun,
+                    Wrapper,
+                    Acc0,
+                    EBtreeOpts
+                )
+    end.
+
+
 update_views(TxDb, Mrst, Docs) ->
     #{
         tx := Tx
@@ -129,7 +203,7 @@ update_views(TxDb, Mrst, Docs) ->
 
     % Get initial KV size
     OldKVSize = lists:foldl(fun(View, SizeAcc) ->
-        {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree),
+        {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree),
         SizeAcc + Size
     end, 0, Mrst#mrst.views),
 
@@ -156,7 +230,7 @@ update_views(TxDb, Mrst, Docs) ->
 
     % Get new KV size after update
     NewKVSize = lists:foldl(fun(View, SizeAcc) ->
-        {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree),
+        {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree),
         SizeAcc + Size
     end, 0, Mrst#mrst.views),
 
@@ -176,7 +250,7 @@ open_id_tree(TxDb, Sig) ->
     ebtree:open(Tx, Prefix, get_order(id_btree), TreeOpts).
 
 
-open_view_tree(TxDb, Sig, Lang, View) ->
+open_view_tree(TxDb, Sig, Lang, View, Options) ->
     #{
         tx := Tx,
         db_prefix := DbPrefix
@@ -185,12 +259,21 @@ open_view_tree(TxDb, Sig, Lang, View) ->
         id_num = ViewId
     } = View,
     Prefix = view_tree_prefix(DbPrefix, Sig, ViewId),
-    TreeOpts = [
+    BaseOpts = [
         {collate_fun, couch_views_util:collate_fun(View)},
-        {reduce_fun, make_reduce_fun(Lang, View)},
-        {persist_fun, fun couch_views_fdb:persist_chunks/3},
-        {cache_fun, create_cache_fun({view, ViewId})}
+        {persist_fun, fun couch_views_fdb:persist_chunks/3}
     ],
+    ExtraOpts = case lists:keyfind(read_only, 1, Options) of
+        {read_only, Idx} ->
+            RedFun = make_read_only_reduce_fun(Lang, View, Idx),
+            [{reduce_fun, RedFun}];
+        false ->
+            [
+                {reduce_fun, make_reduce_fun(Lang, View)},
+                {cache_fun, create_cache_fun({view, ViewId})}
+            ]
+    end,
+    TreeOpts = BaseOpts ++ ExtraOpts,
     View#mrview{
         btree = ebtree:open(Tx, Prefix, get_order(view_btree), TreeOpts)
     }.
@@ -210,27 +293,60 @@ min_order(V) ->
     V + 1.
 
 
-make_reduce_fun(_Lang, #mrview{}) ->
+make_read_only_reduce_fun(Lang, View, NthRed) ->
+    RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs],
+    if RedFuns /= [] -> ok; true ->
+        io:format(standard_error, "~p~n", [process_info(self(), current_stacktrace)])
+    end,
+    LPad = lists:duplicate(NthRed - 1, []),
+    RPad = lists:duplicate(length(RedFuns) - NthRed, []),
+    FunSrc = lists:nth(NthRed, RedFuns),
     fun
-        (KVs, _ReReduce = false) ->
+        (KVs0, _ReReduce = false) ->
+            KVs1 = detuple_kvs(expand_dupes(KVs0)),
+            {ok, Result} = couch_query_servers:reduce(Lang, [FunSrc], KVs1),
+            {0, 0, LPad ++ Result ++ RPad};
+        (Reductions, _ReReduce = true) ->
+            ExtractFun = fun(Reds) ->
+                {_Count, _Size, UReds} = Reds,
+                [lists:nth(NthRed, UReds)]
+            end,
+            UReds = lists:map(ExtractFun, Reductions),
+            {ok, Result} = case UReds of
+                [RedVal] ->
+                    {ok, RedVal};
+                _ ->
+                    couch_query_servers:rereduce(Lang, [FunSrc], UReds)
+            end,
+            {0, 0, LPad ++ Result ++ RPad}
+    end.
+
+
+make_reduce_fun(Lang, #mrview{} = View) ->
+    RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs],
+    fun
+        (KVs0, _ReReduce = false) ->
+            KVs1 = expand_dupes(KVs0),
             TotalSize = lists:foldl(fun({{K, _DocId}, V}, Acc) ->
                 KSize = couch_ejson_size:encoded_size(K),
-                Acc + case V of
-                    {dups, Dups} ->
-                        lists:foldl(fun(D, DAcc) ->
-                            VSize = couch_ejson_size:encoded_size(D),
-                            DAcc + KSize + VSize
-                        end, 0, Dups);
-                    _ ->
-                        VSize = couch_ejson_size:encoded_size(V),
-                        KSize + VSize
-                end
-            end, 0, KVs),
-            {length(KVs), TotalSize};
-        (KRs, _ReReduce = true) ->
-            lists:foldl(fun({Count, Size}, {CountAcc, SizeAcc}) ->
-                {Count + CountAcc, Size + SizeAcc}
-            end, {0, 0}, KRs)
+                VSize = couch_ejson_size:encoded_size(V),
+                KSize + VSize + Acc
+            end, 0, KVs1),
+            KVs2 = detuple_kvs(KVs1),
+            {ok, UserReds} = couch_query_servers:reduce(Lang, RedFuns, KVs2),
+            {length(KVs1), TotalSize, UserReds};
+        (Reductions, _ReReduce = true) ->
+            FoldFun = fun({Count, Size, UserReds}, {CAcc, SAcc, URedAcc}) ->
+                NewCAcc = Count + CAcc,
+                NewSAcc = Size + SAcc,
+                NewURedAcc = [UserReds | URedAcc],
+                {NewCAcc, NewSAcc, NewURedAcc}
+            end,
+            InitAcc = {0, 0, []},
+            FinalAcc = lists:foldl(FoldFun, InitAcc, Reductions),
+            {FinalCount, FinalSize, UReds} = FinalAcc,
+            {ok, Result} = couch_query_servers:rereduce(Lang, RedFuns, UReds),
+            {FinalCount, FinalSize, Result}
     end.
 
 
@@ -284,6 +400,17 @@ to_map_opts(Options) ->
     {Dir, StartKey, EndKey, InclusiveEnd}.
 
 
+to_red_opts(Options) ->
+    {Dir, StartKey, EndKey, InclusiveEnd} = to_map_opts(Options),
+
+    GroupKeyFun = case lists:keyfind(group_key_fun, 1, Options) of
+        {group_key_fun, GKF} -> GKF;
+        false -> fun({_Key, _DocId}) -> global_group end
+    end,
+
+    {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun}.
+
+
 gather_update_info(Tx, Mrst, Docs) ->
     % A special token used to indicate that the row should be deleted
     DeleteRef = erlang:make_ref(),
@@ -420,6 +547,22 @@ combine_vals(V1, V2) ->
     {dups, [V1, V2]}.
 
 
+expand_dupes([]) ->
+    [];
+expand_dupes([{K, {dups, Dups}} | Rest]) ->
+    Expanded = [{K, D} || D <- Dups],
+    Expanded ++ expand_dupes(Rest);
+expand_dupes([{K, V} | Rest]) ->
+    [{K, V} | expand_dupes(Rest)].
+
+
+detuple_kvs([]) ->
+    [];
+detuple_kvs([KV | Rest]) ->
+    {{Key, Id}, Value} = KV,
+    [[[Key, Id], Value] | detuple_kvs(Rest)].
+
+
 id_tree_prefix(DbPrefix, Sig) ->
     Key = {?DB_VIEWS, ?VIEW_TREES, Sig, ?VIEW_ID_TREE},
     erlfdb_tuple:pack(Key, DbPrefix).